You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:50 UTC

[incubator-servicecomb-saga] 06/07: SCB-224 retry synchronously

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 7e01a4f3ca696716ca7beaedc9032e7ca6383fff
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Feb 23 15:44:26 2018 +0800

    SCB-224 retry synchronously
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../org/apache/servicecomb/saga/PackStepdefs.java  |  16 +--
 .../servicecomb/saga/alpha/core/Command.java       |   2 +-
 .../saga/alpha/core/CommandRepository.java         |   2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  |  22 ++--
 .../saga/alpha/core/PushBackOmegaCallback.java     |   9 +-
 .../saga/alpha/core/TxConsistentService.java       |   9 +-
 .../servicecomb/saga/alpha/core/TxEvent.java       |  53 +++++----
 .../saga/alpha/core/TxEventRepository.java         |   2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   |   4 +-
 .../saga/alpha/server/CommandEntityRepository.java |  15 ++-
 .../saga/alpha/server/GrpcOmegaCallback.java       |   2 +-
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |   2 +-
 .../saga/alpha/server/SpringCommandRepository.java |  23 +---
 .../saga/alpha/server/SpringTxEventRepository.java |  15 +--
 .../alpha/server/SpringTxTimeoutRepository.java    |   5 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  49 +++++---
 .../src/main/resources/schema-mysql.sql            |   5 +-
 .../src/main/resources/schema-postgresql.sql       |   4 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 129 ++++++++++-----------
 alpha/alpha-server/src/test/resources/schema.sql   |   4 +-
 .../pack/tests/CommandEnvelopeRepository.java      |  13 +--
 .../integration/pack/tests/GreetingController.java |   8 ++
 .../integration/pack/tests/GreetingService.java    |  22 ++++
 .../saga/integration/pack/tests/PackIT.java        |  92 +++++++++++++--
 .../connector/grpc/GrpcClientMessageSender.java    |   4 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   6 +-
 .../grpc/LoadBalancedClusterMessageSender.java     |   8 +-
 .../connector/grpc/PushBackReconnectRunnable.java  |   8 +-
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  15 +--
 .../connector/grpc/RetryableMessageSenderTest.java |   1 +
 .../saga/omega/context/CompensationContext.java    |   5 +-
 .../spring/CompensableAnnotationProcessor.java     |   1 +
 .../spring/CompensableMethodCheckingCallback.java  |  10 +-
 .../spring/TransactionAspectConfig.java            |   5 +-
 .../spring/TransactionInterceptionTest.java        | 103 ++++++++++++----
 .../spring/TransactionalUserService.java           |  24 +++-
 .../saga/omega/transaction/spring/User.java        |   2 +-
 .../omega/transaction/CompensableInterceptor.java  |  12 +-
 .../transaction/CompensationMessageHandler.java    |  15 +--
 ...TransactionAspect.java => DefaultRecovery.java} |  59 +++++-----
 .../omega/transaction/EventAwareInterceptor.java   |   6 +-
 .../saga/omega/transaction/ForwardRecovery.java    |  76 ++++++++++++
 ...TxCompensatedEvent.java => RecoveryPolicy.java} |  13 ++-
 ...aEndedEvent.java => RecoveryPolicyFactory.java} |  16 ++-
 .../saga/omega/transaction/SagaEndedEvent.java     |   5 +-
 .../transaction/SagaStartAnnotationProcessor.java  |   5 +-
 .../saga/omega/transaction/SagaStartAspect.java    |   2 +-
 .../saga/omega/transaction/SagaStartedEvent.java   |   2 +-
 .../omega/transaction/TimeAwareInterceptor.java    |   0
 .../saga/omega/transaction/TransactionAspect.java  |  36 +-----
 .../saga/omega/transaction/TxAbortedEvent.java     |   3 +-
 .../saga/omega/transaction/TxCompensatedEvent.java |   2 +-
 .../saga/omega/transaction/TxEndedEvent.java       |   2 +-
 .../saga/omega/transaction/TxEvent.java            |  19 +--
 .../saga/omega/transaction/TxStartedEvent.java     |   7 +-
 .../omega/transaction/annotations/Compensable.java |   4 +-
 .../transaction/CompensableInterceptorTest.java    |   8 +-
 .../CompensationMessageHandlerTest.java            |  16 ++-
 ...ionAspectTest.java => DefaultRecoveryTest.java} | 115 +++++++++++-------
 ...ionAspectTest.java => ForwardRecoveryTest.java} | 122 ++++++++++---------
 .../transaction/TimeAwareInterceptorTest.java      |   0
 .../omega/transaction/TransactionAspectTest.java   |  92 ++++++++++++---
 .../src/main/proto/GrpcTxEvent.proto               |  13 ++-
 63 files changed, 868 insertions(+), 481 deletions(-)

diff --git a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
index 4f2f729..f9af438 100644
--- a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
+++ b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
@@ -41,7 +41,7 @@ import cucumber.api.java.After;
 import cucumber.api.java8.En;
 
 public class PackStepdefs implements En {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String ALPHA_REST_ADDRESS = "alpha.rest.address";
   private static final String CAR_SERVICE_ADDRESS = "car.service.address";
@@ -73,7 +73,7 @@ public class PackStepdefs implements En {
     });
 
     Given("^Install the byteman script ([A-Za-z0-9_\\.]+) to ([A-Za-z]+) Service$", (String script, String service) -> {
-      log.info("Install the byteman script {} to {} service", script, service);
+      LOG.info("Install the byteman script {} to {} service", script, service);
       List<String> rules = new ArrayList<>();
       rules.add("target/test-classes/" + script);
       Submit bm = getBytemanSubmit(service);
@@ -81,7 +81,7 @@ public class PackStepdefs implements En {
     });
 
     When("^User ([A-Za-z]+) requests to book ([0-9]+) cars and ([0-9]+) rooms (success|fail)$", (username, cars, rooms, result) -> {
-      log.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
+      LOG.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
 
       Response resp = given()
           .pathParam("name", username)
@@ -116,7 +116,7 @@ public class PackStepdefs implements En {
 
   @After
   public void cleanUp() {
-    log.info("Cleaning up services");
+    LOG.info("Cleaning up services");
     for (String address : addresses) {
       given()
           .when()
@@ -135,7 +135,7 @@ public class PackStepdefs implements En {
       try {
         bm.deleteAllRules();
       } catch (Exception e) {
-        log.warn("Fail to delete the byteman rules " + e);
+        LOG.warn("Fail to delete the byteman rules " + e);
       }
     }
   }
@@ -156,7 +156,7 @@ public class PackStepdefs implements En {
       return;
     }
 
-    log.info("Retrieved data {} from service", actualMaps);
+    LOG.info("Retrieved data {} from service", actualMaps);
     dataTable.diff(DataTable.create(actualMaps));
   }
 
@@ -180,12 +180,12 @@ public class PackStepdefs implements En {
     if (isEmpty(infoURI)) {
       infoURI = "/info";
     }
-    log.info("The info service uri is " + infoURI);
+    LOG.info("The info service uri is " + infoURI);
     probe(address, infoURI);
   }
 
   private void probe(String address, String infoURI) {
-    log.info("Connecting to service address {}", address);
+    LOG.info("Connecting to service address {}", address);
     given()
         .when()
         .get(address + infoURI)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 4edb928..6c8f370 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -76,7 +76,7 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  private Command(long id,
+  public Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 25288ed..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(TxEvent abortEvent);
+  void saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index d3fba31..3168870 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EventScanner implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final ScheduledExecutorService scheduler;
@@ -83,7 +83,7 @@ public class EventScanner implements Runnable {
   private void findTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
-          log.info("Found timeout event {}", event);
+          LOG.info("Found timeout event {}", event);
           timeoutRepository.save(txTimeoutOf(event));
         });
   }
@@ -93,9 +93,9 @@ public class EventScanner implements Runnable {
   }
 
   private void saveUncompensatedEventsToCommands() {
-    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId)
-        .ifPresent(event -> {
-          log.info("Found uncompensated event {}", event);
+    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+        .forEach(event -> {
+          LOG.info("Found uncompensated event {}", event);
           nextEndedEventId = event.id();
           commandRepository.saveCompensationCommands(event.globalTxId());
         });
@@ -104,7 +104,7 @@ public class EventScanner implements Runnable {
   private void updateCompensatedCommands() {
     eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
         .ifPresent(event -> {
-          log.info("Found compensated event {}", event);
+          LOG.info("Found compensated event {}", event);
           nextCompensatedEventId = event.id();
           updateCompensationStatus(event);
         });
@@ -114,13 +114,13 @@ public class EventScanner implements Runnable {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
     } catch (Exception e) {
-      log.warn("Failed to delete duplicate event", e);
+      LOG.warn("Failed to delete duplicate event", e);
     }
   }
 
   private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+    LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
         event.globalTxId(),
         event.localTxId());
 
@@ -129,7 +129,7 @@ public class EventScanner implements Runnable {
 
   private void abortTimeoutEvents() {
     timeoutRepository.findFirstTimeout().forEach(timeout -> {
-      log.info("Found timeout event {} to abort", timeout);
+      LOG.info("Found timeout event {} to abort", timeout);
 
       eventRepository.save(toTxAbortedEvent(timeout));
 
@@ -152,7 +152,7 @@ public class EventScanner implements Runnable {
 
   private void markGlobalTxEnd(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
-    log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
+    LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
   }
 
   private TxEvent toTxAbortedEvent(TxTimeout timeout) {
@@ -182,7 +182,7 @@ public class EventScanner implements Runnable {
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
-          log.info("Compensating transaction with globalTxId {} and localTxId {}",
+          LOG.info("Compensating transaction with globalTxId {} and localTxId {}",
               command.globalTxId(),
               command.localTxId());
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c14..9556d7c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PushBackOmegaCallback implements OmegaCallback {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final BlockingQueue<Runnable> pendingCompensations;
   private final OmegaCallback underlying;
@@ -45,11 +45,12 @@ public class PushBackOmegaCallback implements OmegaCallback {
   }
 
   private void logError(TxEvent event, Exception e) {
-    log.error(
-        "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+    LOG.error(
+        "Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+        event.retries() == 0 ? "compensate" : "retry",
         event.serviceName(),
         event.instanceId(),
-        event.compensationMethod(),
+        event.retries() == 0 ? event.compensationMethod() : event.retryMethod(),
         event.globalTxId(),
         event.localTxId(),
         e);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b7..4382fd3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventRepository eventRepository;
 
@@ -41,8 +41,9 @@ public class TxConsistentService {
   }
 
   public boolean handle(TxEvent event) {
-    if (types.contains(event.type()) && isGlobalTxAborted(event)) {
-      log.info("Transaction event {} rejected, because its parent with globalTxId {} was already aborted", event.type(), event.globalTxId());
+    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+      LOG.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted",
+          event.globalTxId());
       return false;
     }
 
@@ -51,7 +52,7 @@ public class TxConsistentService {
       if (eventRepository.findTimeoutEvents().stream()
           .filter(txEvent -> txEvent.globalTxId().equals(event.globalTxId()))
           .count() == 1) {
-        log.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
+        LOG.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
         return false;
       }
     }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1abb7fe..17b059c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,12 +17,16 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Date;
 
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Transient;
 
 @Entity
 @Table(name = "TxEvent")
@@ -43,9 +47,9 @@ public class TxEvent {
   private String type;
   private String compensationMethod;
   private Date expiryTime;
-  private byte[] payloads;
+  private String retryMethod;
   private int retries;
-  private String retriesMethod;
+  private byte[] payloads;
 
   private TxEvent() {
   }
@@ -61,6 +65,8 @@ public class TxEvent {
         event.type,
         event.compensationMethod,
         event.expiryTime,
+        event.retryMethod,
+        event.retries,
         event.payloads);
   }
 
@@ -73,7 +79,8 @@ public class TxEvent {
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, payloads);
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, "", 0,
+        payloads);
   }
 
   public TxEvent(
@@ -85,9 +92,11 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout,
-        "", 0, payloads);
+        retryMethod, retries, payloads);
   }
 
   public TxEvent(
@@ -100,9 +109,11 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod,
-        timeout, "", 0, payloads);
+        timeout, retryMethod, retries, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -115,10 +126,14 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
         compensationMethod,
         timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+        retryMethod,
+        retries,
         payloads);
   }
 
@@ -132,7 +147,7 @@ public class TxEvent {
       String type,
       String compensationMethod,
       Date expiryTime,
-      String retriesMethod,
+      String retryMethod,
       int retries,
       byte[] payloads) {
     this.surrogateId = surrogateId;
@@ -145,7 +160,7 @@ public class TxEvent {
     this.type = type;
     this.compensationMethod = compensationMethod;
     this.expiryTime = expiryTime;
-    this.retriesMethod = retriesMethod;
+    this.retryMethod = retryMethod;
     this.retries = retries;
     this.payloads = payloads;
   }
@@ -194,6 +209,14 @@ public class TxEvent {
     return expiryTime;
   }
 
+  public String retryMethod() {
+    return retryMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
@@ -206,19 +229,9 @@ public class TxEvent {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", expiryTime='" + expiryTime + '\'' +
+        ", expiryTime=" + expiryTime +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         '}';
   }
-
-  public int retries() {
-    return retries;
-  }
-
-  public String retriesMethod() {
-    return retriesMethod;
-  }
-
-  public boolean containChildren(TxEvent event) {
-    return this.localTxId.equals(event.parentTxId);
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 9eceadd..c481226 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -97,7 +97,7 @@ public interface TxEventRepository {
    * @param id
    * @return
    */
-  Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id);
+  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
   /**
    * Find a {@link TxEvent} which satisfies below requirements:
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index da36066..e80d5a8 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -73,8 +73,8 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
-      return Optional.empty();
+    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+      return emptyList();
     }
 
     @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 737fd11..53110bf 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -34,9 +34,22 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
   @Transactional
   @Modifying(clearAutomatically = true)
   @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+      + "SET c.status = :toStatus "
+      + "WHERE c.globalTxId = :globalTxId "
+      + "  AND c.localTxId = :localTxId "
+      + "  AND c.status = :fromStatus")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("fromStatus") String fromStatus,
+      @Param("toStatus") String toStatus,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
       + "SET c.status = :status "
       + "WHERE c.globalTxId = :globalTxId "
-      + "AND c.localTxId = :localTxId")
+      + "  AND c.localTxId = :localTxId")
   void updateStatusByGlobalTxIdAndLocalTxId(
       @Param("status") String status,
       @Param("globalTxId") String globalTxId,
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a95281..a54fa66 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -42,7 +42,7 @@ class GrpcOmegaCallback implements OmegaCallback {
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
-        .setCompensateMethod(event.compensationMethod())
+        .setCompensationMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
     observer.onNext(command);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 99457a2..a3137b4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -85,7 +85,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getType(),
         message.getCompensationMethod(),
         message.getTimeout(),
-        message.getRetriesMethod(),
+        message.getRetryMethod(),
         message.getRetries(),
         message.getPayloads().toByteArray()
     ));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index ed7b4f1..4aa30d1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SpringCommandRepository implements CommandRepository {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
@@ -57,13 +57,13 @@ public class SpringCommandRepository implements CommandRepository {
     }
 
     for (Command command : commands.values()) {
-      log.info("Saving compensation command {}", command);
+      LOG.info("Saving compensation command {}", command);
       try {
         commandRepository.save(command);
       } catch (Exception e) {
-        log.warn("Failed to save some command {}", command);
+        LOG.warn("Failed to save some command {}", command);
       }
-      log.info("Saved compensation command {}", command);
+      LOG.info("Saved compensation command {}", command);
     }
   }
 
@@ -85,24 +85,11 @@ public class SpringCommandRepository implements CommandRepository {
 
     commands.forEach(command ->
         commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+            NEW.name(),
             PENDING.name(),
             command.globalTxId(),
             command.localTxId()));
 
     return commands;
   }
-
-  private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
-    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
-        .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
-            && Objects.equals(c.localTxId(), localTxId)).count();
-  }
-
-  private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
-    return Collections.singletonList(new TxEvent(
-        abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
-        txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
-        txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
-    ));
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index cae6456..e48a780 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,17 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
+
 import java.util.List;
 import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.springframework.data.domain.PageRequest;
-import org.springframework.util.CollectionUtils;
-
-import javax.swing.text.html.Option;
-
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 
 class SpringTxEventRepository implements TxEventRepository {
   private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1);
@@ -63,12 +60,8 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
-    List<TxEvent> result = eventRepo.findFirstUncompensatedEventByIdGreaterThan(id, SINGLE_TX_EVENT_REQUEST);
-    if (CollectionUtils.isEmpty(result)) {
-      return Optional.empty();
-    }
-    return Optional.of(result.get(0));
+  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST);
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 7195139..53b4443 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.data.domain.PageRequest;
 
 public class SpringTxTimeoutRepository implements TxTimeoutRepository {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final TxTimeoutEntityRepository timeoutRepo;
 
   SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
@@ -43,7 +44,7 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository {
     try {
       timeoutRepo.save(timeout);
     } catch (Exception ignored) {
-      log.warn("Failed to save some timeout {}", timeout);
+      LOG.warn("Failed to save some timeout {}", timeout);
     }
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3a7edb3..68e6233 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -35,7 +35,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
       + "  SELECT t1.globalTxId FROM TxEvent t1"
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.surrogateId != t.surrogateId "
+      + "    AND t3.creationTime > t.creationTime) AND (("
+      + "SELECT MIN(t2.retries) FROM TxEvent t2 "
+      + "WHERE t2.globalTxId = t.globalTxId "
+      + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.type = 'TxStartedEvent') = 0 "
+      + "OR t.globalTxId = t.localTxId)")
   Optional<TxEvent> findFirstAbortedGlobalTxByType();
 
   @Query("SELECT t FROM TxEvent t "
@@ -56,9 +66,13 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
-      + "t.type, t.compensationMethod, t.payloads"
+      + "t.type, t.compensationMethod, t.payloads "
       + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = ?2")
+      + "WHERE t.globalTxId = ?1 AND t.type = ?2 "
+      + "  AND ( SELECT MIN(t1.retries) FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type = 'TxStartedEvent' ) = 0 ")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
   @Query("SELECT t FROM TxEvent t "
@@ -73,25 +87,30 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.compensationMethod != t.retriesMethod "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?1 AND EXISTS ( "
-      + "  SELECT t1.globalTxId"
-      + "  FROM TxEvent t1 "
+      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "  AND t1.type = 'TxAbortedEvent'"
-      + ") AND NOT EXISTS ( "
-      + "  SELECT t2.globalTxId"
-      + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = t.globalTxId "
-      + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent') "
+      + "    AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+      + "    SELECT t2.globalTxId FROM TxEvent t2 "
+      + "    WHERE t2.globalTxId = t1.globalTxId "
+      + "      AND t2.localTxId = t1.localTxId "
+      + "      AND t2.type = 'TxStartedEvent' "
+      + "      AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.type = 'TxCompensatedEvent') AND ( "
+      + "  SELECT MIN(t4.retries) FROM TxEvent t4 "
+      + "  WHERE t4.globalTxId = t.globalTxId "
+      + "    AND t4.localTxId = t.localTxId "
+      + "    AND t4.type = 'TxStartedEvent' ) = 0 "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long surrogateId, Pageable pageable);
+  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
 
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index c21e518..3806f6d 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -26,8 +26,6 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
-  retries_method varchar(256) NOT NULL,
-  retries int NOT NULL,
   payloads varbinary(10240),
   PRIMARY KEY (surrogateId),
   INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime)
@@ -43,6 +41,8 @@ CREATE TABLE IF NOT EXISTS Command (
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
   compensationMethod varchar(256) NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads varbinary(10240),
   status varchar(12),
   lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS Command (
   INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, status)
 ) DEFAULT CHARSET=utf8;
 
+
 CREATE TABLE IF NOT EXISTS TxTimeout (
   surrogateId bigint NOT NULL AUTO_INCREMENT,
   eventId bigint NOT NULL UNIQUE,
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 674e051..4ecb1b4 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime timestamp(6) NOT NULL,
-  retriesMethod varchar(256) NOT NULL,
-  retries int NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads bytea
 );
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 26aa17a..10bb2dd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -92,6 +92,8 @@ public class AlphaIntegrationTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String compensationMethod = getClass().getCanonicalName();
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String serviceName = uniquify("serviceName");
   private final String instanceId = uniquify("instanceId");
 
@@ -128,7 +130,9 @@ public class AlphaIntegrationTest {
   private TxConsistentService consistentService;
 
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
-  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
+
+  private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver(
+      this::onCompensation);
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -205,7 +209,7 @@ public class AlphaIntegrationTest {
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
+    CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
     TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -247,7 +251,7 @@ public class AlphaIntegrationTest {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -269,9 +273,9 @@ public class AlphaIntegrationTest {
 
     assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+            .setCompensationMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
+            .setCompensationMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
     ));
   }
 
@@ -289,7 +293,7 @@ public class AlphaIntegrationTest {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -301,7 +305,7 @@ public class AlphaIntegrationTest {
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensateStreamObserver());
+    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
 
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -403,7 +407,7 @@ public class AlphaIntegrationTest {
   @Test
   public void abortTimeoutTxStartedEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
 
     await().atMost(2, SECONDS).until(() -> {
@@ -429,6 +433,26 @@ public class AlphaIntegrationTest {
     });
   }
 
+  @Test
+  public void doNotCompensateRetryingEvents() throws InterruptedException {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.size(), is(4));
+    assertThat(events.get(0).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(2).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(3).type(), is(TxEndedEvent.name()));
+
+    assertThat(receivedCommands.isEmpty(), is(true));
+  }
+
   private boolean waitTillTimeoutDone() {
     for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) {
       if (txTimeout.status().equals(DONE.name())) {
@@ -438,62 +462,13 @@ public class AlphaIntegrationTest {
     return false;
   }
 
-  @Test
-  public void retiesAndCompensateOnFailure() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-
-    String localTxId1 = UUID.randomUUID().toString();
-
-    blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
-        .setServiceName(serviceName)
-        .setInstanceId(instanceId)
-        .setTimestamp(System.currentTimeMillis())
-        .setGlobalTxId(globalTxId)
-        .setLocalTxId(localTxId1)
-        .setParentTxId(parentTxId)
-        .setType(TxStartedEvent.name())
-        .setCompensationMethod("Compensation Method")
-        .setPayloads(ByteString.copyFrom(payload.getBytes()))
-        .setRetries(3).setRetriesMethod("Retries Method")
-        .build());
-    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
-
-    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
-
-    for (int i = 0; i < 3; i++) {
-      blockingStub.onTxEvent(
-          eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
-      await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
-      GrpcCompensateCommand command = receivedCommands.poll();
-      assertThat(command.getGlobalTxId(), is(globalTxId));
-      assertThat(command.getLocalTxId(), is(localTxId1));
-      assertThat(command.getParentTxId(), is(parentTxId));
-      assertThat(command.getCompensateMethod(), is("Retries Method"));
-      assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
-    }
-
-    blockingStub.onTxEvent(
-        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
-    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
-    GrpcCompensateCommand command = receivedCommands.poll();
-    assertThat(command.getGlobalTxId(), is(globalTxId));
-    assertThat(command.getLocalTxId(), is(localTxId1));
-    assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is("Compensation Method"));
-    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
-  }
-
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
             command.getLocalTxId(),
             command.getParentTxId(),
-            new byte[0],
-            command.getCompensateMethod()));
+            command.getPayloads().toByteArray(),
+            command.getCompensationMethod()));
   }
 
   private GrpcServiceConfig someServiceConfig() {
@@ -516,23 +491,35 @@ public class AlphaIntegrationTest {
   }
 
   private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout);
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout,
+        "", 0);
+  }
+
+  private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0,
+        retryMethod, retries);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
-    return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+    return someGrpcEvent(type, localTxId);
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
+  private GrpcTxEvent someGrpcEvent(EventType type, String localTxId) {
     return someGrpcEvent(type, globalTxId, localTxId);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0);
+    return someGrpcEvent(type, globalTxId, localTxId, parentTxId);
+  }
+
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "",
+        0);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0);
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
+      String compensationMethod) {
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -541,7 +528,9 @@ public class AlphaIntegrationTest {
       String parentTxId,
       byte[] payloads,
       String compensationMethod,
-      int timeout) {
+      int timeout,
+      String retryMethod,
+      int retries) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -553,19 +542,21 @@ public class AlphaIntegrationTest {
         .setType(eventType.name())
         .setCompensationMethod(compensationMethod)
         .setTimeout(timeout)
+        .setRetryMethod(retryMethod)
+        .setRetries(retries)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
 
-  private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+  private static class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> {
     private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
-    private CompensateStreamObserver() {
+    private CompensationStreamObserver() {
       this(command -> {});
     }
 
-    private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+    private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
       this.consumer = consumer;
     }
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index ca46625..8d70899 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime TIMESTAMP NOT NULL,
-  retriesMethod varchar(256) NOT NULL,
-  retries int NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int DEFAULT 0 NOT NULL,
   payloads varbinary(10240)
 );
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
similarity index 68%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
index 8c70e3a..ad8ae3a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.transaction;
+package org.apache.servicecomb.saga.integration.pack.tests;
 
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.repository.CrudRepository;
 
-public class SagaEndedEvent extends TxEvent {
-
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
-  }
+interface CommandEnvelopeRepository extends CrudRepository<Command, Long> {
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 2bdd587..e497cec 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -77,4 +77,12 @@ public class GreetingController {
   ResponseEntity<String> goodNight(@RequestParam String name) {
     return ResponseEntity.ok("Good night, " + name);
   }
+
+  @SagaStart
+  @GetMapping("/open")
+  ResponseEntity<String> open(@RequestParam String name, @RequestParam int retries) {
+    String greetings = greetingService.greet(name);
+    String status = greetingService.open(name, retries);
+    return ResponseEntity.ok(greetings + "; " + status);
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index 69a86f6..554dc15 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -27,6 +27,9 @@ import org.springframework.stereotype.Service;
 class GreetingService {
   private final Queue<String> compensated;
 
+  private final int MAX_COUNT = 3;
+  private int failedCount = 1;
+
   @Autowired
   GreetingService(Queue<String> compensated) {
     this.compensated = compensated;
@@ -59,8 +62,27 @@ class GreetingService {
     return appendMessage("My bad, please take the window instead, " + name);
   }
 
+  @Compensable(retries = MAX_COUNT, compensationMethod = "close")
+  String open(String name, int retries) {
+    if (failedCount < retries) {
+      failedCount += 1;
+      throw new IllegalStateException("You know when the zoo opens, " + name);
+    }
+    resetCount();
+    return "Welcome to visit the zoo, " + name;
+  }
+
+  String close(String name, int retries) {
+    resetCount();
+    return appendMessage("Sorry, the zoo has already closed, " + name);
+  }
+
   private String appendMessage(String message) {
     compensated.add(message);
     return message;
   }
+
+  void resetCount() {
+    this.failedCount = 1;
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index b3045e3..9199ff6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -56,14 +56,23 @@ public class PackIT {
   private OmegaContext omegaContext;
 
   @Autowired
-  private TxEventEnvelopeRepository repository;
+  private TxEventEnvelopeRepository eventRepo;
+
+  @Autowired
+  private CommandEnvelopeRepository commandRepo;
 
   @Autowired
   private Queue<String> compensatedMessages;
 
+  @Autowired
+  private GreetingService greetingService;
+
   @After
   public void tearDown() throws Exception {
-    repository.deleteAll();
+    eventRepo.deleteAll();
+    commandRepo.deleteAll();
+    compensatedMessages.clear();
+    greetingService.resetCount();
   }
 
   @Test(timeout = 5000)
@@ -75,11 +84,11 @@ public class PackIT {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Greetings, mike; Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -136,13 +145,13 @@ public class PackIT {
 
     assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
 
-    await().atMost(2, SECONDS).until(() -> repository.count() == 7);
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 7);
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
     assertThat(events.size(), is(7));
 
     TxEvent sagaStartedEvent = events.get(0);
@@ -184,11 +193,11 @@ public class PackIT {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -220,4 +229,69 @@ public class PackIT {
 
     assertThat(compensatedMessages.isEmpty(), is(true));
   }
+
+  @Test(timeout = 5000)
+  public void retrySubTransactionSuccess() {
+    ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        "eric",
+        2);
+
+    assertThat(entity.getStatusCode(), is(OK));
+    assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the zoo, eric"));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 8);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(8));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxEndedEvent"));
+    assertThat(events.get(7).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
+
+  @Test(timeout = 5000)
+  public void compensateWhenRetryReachesMaximum() throws InterruptedException {
+    // retries 3 times and then compensate
+    ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        TRESPASSER,
+        5);
+
+    assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 11);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(11));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxAbortedEvent"));
+    assertThat(events.get(7).type(), is("TxStartedEvent"));
+    assertThat(events.get(8).type(), is("TxAbortedEvent"));
+    assertThat(events.get(9).type(), is("TxCompensatedEvent"));
+    assertThat(events.get(10).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages, contains("Goodbye, " + TRESPASSER));
+  }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index cf53a0c..b33eeb3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,13 +42,13 @@ import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
 
 public class GrpcClientMessageSender implements MessageSender {
-
   private final String target;
   private final TxEventServiceStub asyncEventService;
 
   private final MessageSerializer serializer;
 
   private final TxEventServiceBlockingStub blockingEventService;
+
   private final GrpcCompensateStreamObserver compensateStreamObserver;
   private final GrpcServiceConfig serviceConfig;
 
@@ -104,8 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setCompensationMethod(event.compensationMethod())
+        .setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
         .setRetries(event.retries())
-        .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 3cf46f8..9d9c312 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -46,14 +46,14 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
 
   @Override
   public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
 
     messageHandler.onReceive(
         command.getGlobalTxId(),
         command.getLocalTxId(),
         command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensateMethod(),
+        command.getCompensationMethod(),
         deserializer.deserialize(command.getPayloads().toByteArray()));
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9a78a62..afff8e7 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -48,7 +48,7 @@ import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
@@ -104,7 +104,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       try {
         sender.onConnected();
       } catch (Exception e) {
-        log.error("Failed connecting to alpha at {}", sender.target(), e);
+        LOG.error("Failed connecting to alpha at {}", sender.target(), e);
       }
     });
   }
@@ -115,7 +115,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       try {
         sender.onDisconnected();
       } catch (Exception e) {
-        log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+        LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
     });
   }
@@ -140,7 +140,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       } catch (OmegaException e) {
         throw e;
       } catch (Exception e) {
-        log.error("Retry sending event {} due to failure", event, e);
+        LOG.error("Retry sending event {} due to failure", event, e);
 
         // very large latency on exception
         senders.put(messageSender, Long.MAX_VALUE);
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index f019d10..02571fd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class PushBackReconnectRunnable implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final MessageSender messageSender;
   private final Map<MessageSender, Long> senders;
   private final BlockingQueue<Runnable> pendingTasks;
@@ -47,14 +47,14 @@ class PushBackReconnectRunnable implements Runnable {
   @Override
   public void run() {
     try {
-      log.info("Retry connecting to alpha at {}", messageSender.target());
+      LOG.info("Retry connecting to alpha at {}", messageSender.target());
       messageSender.onDisconnected();
       messageSender.onConnected();
       senders.put(messageSender, 0L);
       connectedSenders.offer(messageSender);
-      log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+      LOG.info("Retry connecting to alpha at {} is successful", messageSender.target());
     } catch (Exception e) {
-      log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+      LOG.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
       pendingTasks.offer(this);
     }
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index d66b737..e8b2f34 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -90,15 +90,15 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final List<String> compensated = new ArrayList<>();
 
-  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) ->
-      compensated.add(globalTxId);
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod,
+      payloads) -> compensated.add(globalTxId);
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
   private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
-      compensationMethod, 0, null, 0, "blah");
+      compensationMethod, 0, "", 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -335,6 +335,7 @@ public class LoadBalancedClusterMessageSenderTest {
     private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
+
     private StreamObserver<GrpcCompensateCommand> responseObserver;
 
     private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
@@ -357,9 +358,9 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getLocalTxId(),
           request.getParentTxId(),
           request.getCompensationMethod(),
-          0,
-          null,
-          0,
+          request.getTimeout(),
+          request.getRetryMethod(),
+          request.getRetries(),
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 9d0ebc4..3856415 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,6 +42,7 @@ public class RetryableMessageSenderTest {
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
+
   private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
 
   @Test
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 48a67f7..cf16888 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -36,7 +36,7 @@ public class CompensationContext {
     contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
   }
 
-  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
+  public void apply(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
     CompensationContextInternal contextInternal = contexts.get(compensationMethod);
 
     try {
@@ -44,7 +44,7 @@ public class CompensationContext {
       LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       LOG.error(
-          "Pre-checking for compensate method " + contextInternal.compensationMethod.toString()
+          "Pre-checking for compensation method " + contextInternal.compensationMethod.toString()
               + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
           e);
     }
@@ -52,6 +52,7 @@ public class CompensationContext {
 
   private static final class CompensationContextInternal {
     private final Object target;
+
     private final Method compensationMethod;
 
     private CompensationContextInternal(Object target, Method compensationMethod) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 338751c..7d7d45f 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -26,6 +26,7 @@ import org.springframework.util.ReflectionUtils;
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
   private final OmegaContext omegaContext;
+
   private final CompensationContext compensationContext;
 
   CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 268fad9..90d8b06 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -31,6 +31,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
+
   private final CompensationContext compensationContext;
 
   CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
@@ -47,10 +48,13 @@ class CompensableMethodCheckingCallback implements MethodCallback {
     String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
 
     try {
-      Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
       compensationContext.addCompensationContext(method, bean);
-      compensationContext.addCompensationContext(signature, bean);
-      LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+
+      if (!compensationMethod.isEmpty()) {
+        Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+        compensationContext.addCompensationContext(signature, bean);
+        LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+      }
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
           "No such compensation method [" + compensationMethod + "] found in " + bean.getClass().getCanonicalName(),
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 41be8f7..4fd4188 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -35,7 +35,7 @@ public class TransactionAspectConfig {
 
   @Bean
   MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
-    return new CompensationMessageHandler(sender, omegaContext, context);
+    return new CompensationMessageHandler(sender, context);
   }
 
   @Order(0)
@@ -51,7 +51,8 @@ public class TransactionAspectConfig {
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext,
+      CompensationContext compensationContext) {
     return new CompensableAnnotationProcessor(omegaContext, compensationContext);
   }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 19d0942..6505bfc 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -24,10 +24,13 @@ import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -106,15 +109,18 @@ public class TransactionInterceptionTest {
 
   private String compensationMethod;
 
-  private String retriesMethod;
+  private String compensationMethod2;
+
+  private String retryMethod;
 
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
-    retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
+    retryMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class, int.class).toString();
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
+    compensationMethod2 = TransactionalUserService.class.getDeclaredMethod("delete", User.class, int.class).toString();
   }
 
   @After
@@ -122,6 +128,7 @@ public class TransactionInterceptionTest {
     messages.clear();
     userRepository.deleteAll();
     omegaContext.clear();
+    userService.resetCount();
   }
 
   @AfterClass
@@ -133,8 +140,9 @@ public class TransactionInterceptionTest {
     User user = userService.add(this.user);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+                user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -154,8 +162,9 @@ public class TransactionInterceptionTest {
     }
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+                illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -176,10 +185,11 @@ public class TransactionInterceptionTest {
     assertThat(userRepository.findByUsername(anotherUser.username()), is(nullValue()));
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0,
+                anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -189,6 +199,60 @@ public class TransactionInterceptionTest {
   }
 
   @Test
+  public void retryTillSuccess() {
+    try {
+      userService.add(user, 1);
+    } catch (Exception e) {
+      fail("unexpected exception throw: " + e);
+    }
+
+    assertThat(messages.size(), is(4));
+
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
+            .toString()));
+
+    String abortedEvent = messages.get(1);
+    assertThat(abortedEvent, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
+            .toString()));
+    assertThat(messages.get(3),
+        is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));
+
+    assertThat(userRepository.count(), is(1L));
+    userRepository.findAll().forEach(user -> assertThat(user, is(this.user)));
+  }
+
+  @Test
+  public void retryReachesMaximumThenThrowsException() {
+    try {
+      userService.add(user, 3);
+      expectFailing(IllegalStateException.class);
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage(), is("Retry harder"));
+    }
+
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 3)
+            .toString()));
+
+    String abortedEvent1 = messages.get(1);
+    assertThat(abortedEvent1, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 3)
+            .toString()));
+
+    String abortedEvent2 = messages.get(3);
+    assertThat(abortedEvent2, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(userRepository.count(), is(0L));
+  }
+
+  @Test
   public void passesOmegaContextThroughDifferentThreads() throws Exception {
     new Thread(() -> userService.add(user)).start();
     waitTillSavedUser(username);
@@ -198,10 +262,10 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -217,10 +281,10 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -239,8 +303,8 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(username);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -258,11 +322,10 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
-
     actorSystem.terminate();
   }
 
@@ -298,7 +361,7 @@ public class TransactionInterceptionTest {
     private final List<String> messages = new ArrayList<>();
 
     @Bean
-    CompensationContext compensationContext() {
+    CompensationContext recoveryContext() {
       return new CompensationContext();
     }
 
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
index c98c6ea..0618109 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
@@ -17,21 +17,26 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
-
 @Component
 class TransactionalUserService {
   static final String ILLEGAL_USER = "Illegal User";
   private final UserRepository userRepository;
 
+  private int count = 0;
+
   @Autowired
   TransactionalUserService(UserRepository userRepository) {
     this.userRepository = userRepository;
   }
 
+  void resetCount() {
+    this.count = 0;
+  }
+
   @Compensable(compensationMethod = "delete")
   User add(User user) {
     if (ILLEGAL_USER.equals(user.username())) {
@@ -43,4 +48,19 @@ class TransactionalUserService {
   void delete(User user) {
     userRepository.delete(user);
   }
+
+  @Compensable(retries = 2, compensationMethod = "delete")
+  User add(User user, int count) {
+    if (this.count < count) {
+      this.count += 1;
+      throw new IllegalStateException("Retry harder");
+    }
+    resetCount();
+    return userRepository.save(user);
+  }
+
+  void delete(User user, int count) {
+    resetCount();
+    userRepository.delete(user);
+  }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
index c5c3d84..da9d4b2 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
@@ -62,7 +62,7 @@ public class User {
       return false;
     }
     User user = (User) o;
-    return id == user.id &&
+    return id.equals(user.id) &&
         Objects.equals(username, user.username) &&
         Objects.equals(email, user.email);
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 988d8d7..588d660 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
   private final OmegaContext context;
@@ -30,9 +29,10 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
-    return sender.send(new TxStartedEvent(
-        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
+    return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
+        timeout, retriesMethod, retries, message));
   }
 
   @Override
@@ -42,7 +42,7 @@ class CompensableInterceptor implements EventAwareInterceptor {
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
-        throwable));
+    sender.send(
+        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 15cf91a..fe2eea5 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -18,26 +18,21 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
-  private final OmegaContext omegaContext;
+
   private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) {
+  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
     this.sender = sender;
     this.context = context;
-    this.omegaContext = omegaContext;
   }
 
   @Override
-  public void onReceive(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, Object... payloads) {
-    String oldLocalTxId = omegaContext.localTxId();
-    omegaContext.setLocalTxId(parentTxId);
-    context.compensate(globalTxId, localTxId, compensationMethod, payloads);
+  public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      Object... payloads) {
+    context.apply(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
-    omegaContext.setLocalTxId(oldLocalTxId);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
index 86cc840..0844981 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,64 +25,59 @@ import javax.transaction.InvalidTransactionException;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Aspect
-public class TransactionAspect {
+/**
+ * DefaultRecovery is used to execute business logic once.
+ * The corresponding events will report to alpha server before and after the execution of business logic.
+ * If there are errors while executing the business logic, a TxAbortedEvent will be reported to alpha.
+ *
+ *                 pre                       post
+ *     request --------- 2.business logic --------- response
+ *                 \                          |
+ * 1.TxStartedEvent \                        | 3.TxEndedEvent
+ *                   \                      |
+ *                    ----------------------
+ *                            alpha
+ */
+public class DefaultRecovery implements RecoveryPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final OmegaContext context;
-
-  private final CompensableInterceptor interceptor;
-
-  public TransactionAspect(MessageSender sender, OmegaContext context) {
-    this.context = context;
-    this.interceptor = new CompensableInterceptor(context, sender);
-  }
-
-  @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
-  Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    Object[] args = joinPoint.getArgs();
-    int retries = compensable.retries();
-    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
-    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
+    String compensationSignature =
+        compensable.compensationMethod().isEmpty() ? "" : compensationMethodSignature(joinPoint, compensable, method);
 
-    String localTxId = context.localTxId();
-    context.newLocalTxId();
+    String retrySignature = (retries != 0 || compensationSignature.isEmpty()) ? method.toString() : "";
 
-    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
+    AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.timeout(),
+        retrySignature, retries, joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
-      context.setLocalTxId(localTxId);
+      context.setLocalTxId(parentTxId);
       throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
           " because global transaction " + context.globalTxId() + " has already aborted.");
     }
-    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, compensationSignature);
+      interceptor.postIntercept(parentTxId, compensationSignature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptor.onError(localTxId, compensationSignature, throwable);
+      interceptor.onError(parentTxId, compensationSignature, throwable);
       throw throwable;
-    } finally {
-      context.setLocalTxId(localTxId);
-      LOG.debug("Restored context back to {}", context);
     }
   }
 
-  private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
+  String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
       throws NoSuchMethodException {
-
     return joinPoint.getTarget()
         .getClass()
         .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index e20bea2..285d549 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+        int retries, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +34,8 @@ public interface EventAwareInterceptor {
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
new file mode 100644
index 0000000..d1a28c2
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ForwardRecovery is used to execute business logic with the given retries times.
+ * If retries is above 0, it will retry the given times at most.
+ * If retries == -1, it will retry forever until interrupted.
+ */
+public class ForwardRecovery extends DefaultRecovery {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // TODO: 2018/03/10 we do not support retry with timeout yet
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    int remains = retries;
+    try {
+      while (true) {
+        try {
+          return super.apply(joinPoint, compensable, interceptor, context, parentTxId, remains);
+        } catch (Throwable throwable) {
+          if (throwable instanceof InvalidTransactionException) {
+            throw throwable;
+          }
+
+          remains = remains == -1 ? -1 : remains - 1;
+          if (remains == 0) {
+            LOG.error(
+                "Retried sub tx failed maximum times, global tx id: {}, local tx id: {}, method: {}, retried times: {}",
+                context.globalTxId(), context.localTxId(), method.toString(), retries);
+            throw throwable;
+          }
+
+          LOG.warn("Retrying sub tx failed, global tx id: {}, local tx id: {}, method: {}, remains: {}",
+              context.globalTxId(), context.localTxId(), method.toString(), remains);
+          Thread.sleep(compensable.retryDelayInMilliseconds());
+        }
+      }
+    } catch (InterruptedException e) {
+      String errorMessage = "Failed to handle tx because it is interrupted, global tx id: " + context.globalTxId()
+          + ", local tx id: " + context.localTxId() + ", method: " + method.toString();
+      LOG.error(errorMessage);
+      interceptor.onError(parentTxId, compensationMethodSignature(joinPoint, compensable, method), e);
+      throw new OmegaException(errorMessage);
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
similarity index 64%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
index 8e288df..bc1d4d8 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
 
-public class TxCompensatedEvent extends TxEvent {
-  public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
-  }
+public interface RecoveryPolicy {
+  Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
index 8c70e3a..f59ac2b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,17 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
+public class RecoveryPolicyFactory {
+  private static final RecoveryPolicy DEFAULT_RECOVERY = new DefaultRecovery();
 
-public class SagaEndedEvent extends TxEvent {
+  private static final RecoveryPolicy FORWARD_RECOVERY = new ForwardRecovery();
 
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+  /**
+   * If retries == 0, use the default recovery to execute only once.
+   * If retries > 0, it will use the forward recovery and retry the given times at most.
+   * If retries == -1, it will use the forward recovery and retry forever until interrupted.
+   */
+  static RecoveryPolicy getRecoveryPolicy(int retries) {
+    return retries != 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 8c70e3a..2e28b5e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -20,8 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import org.apache.servicecomb.saga.common.EventType;
 
 public class SagaEndedEvent extends TxEvent {
-
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+  SagaEndedEvent(String globalTxId, String localTxId) {
+    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 49dd8e4..486f28c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,10 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index a2ee58c..8722deb 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0);
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
     try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index cb76a26..0e87a97 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType;
 public class SagaStartedEvent extends TxEvent {
   public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
     // use "" instead of null as compensationMethod requires not null in sql
-    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout);
+    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 86cc840..f7a98ee 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -20,8 +20,6 @@ package org.apache.servicecomb.saga.omega.transaction;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
@@ -47,45 +45,17 @@ public class TransactionAspect {
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
   Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-    LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
-
-    Object[] args = joinPoint.getArgs();
-    int retries = compensable.retries();
-    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
-    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
-
     String localTxId = context.localTxId();
     context.newLocalTxId();
-
-    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
-    if (response.aborted()) {
-      String abortedLocalTxId = context.localTxId();
-      context.setLocalTxId(localTxId);
-      throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
-          " because global transaction " + context.globalTxId() + " has already aborted.");
-    }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
+    int retries = compensable.retries();
+    RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
     try {
-      Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, compensationSignature);
-
-      return result;
-    } catch (Throwable throwable) {
-      interceptor.onError(localTxId, compensationSignature, throwable);
-      throw throwable;
+      return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
     } finally {
       context.setLocalTxId(localTxId);
       LOG.debug("Restored context back to {}", context);
     }
   }
-
-  private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
-      throws NoSuchMethodException {
-
-    return joinPoint.getTarget()
-        .getClass()
-        .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
-        .toString();
-  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index d6aa533..f0bac54 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -24,7 +24,8 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxAbortedEvent extends TxEvent {
   public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable));
+    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
+        stackTrace(throwable));
   }
 
   private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index 8e288df..cd709e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxCompensatedEvent extends TxEvent {
   public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 8d6666a..f702c43 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxEndedEvent extends TxEvent {
   public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index a5b5514..a158af1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,21 +31,22 @@ public class TxEvent {
   private final String compensationMethod;
   private final int timeout;
   private final Object[] payloads;
-  private final String retriesMethod;
+
+  private final String retryMethod;
   private final int retries;
 
   public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
-      int timeout, String retriesMethod, int retries, Object... payloads) {
+      int timeout, String retryMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
+    this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.compensationMethod = compensationMethod;
-    this.payloads = payloads;
-    this.globalTxId = globalTxId;
     this.timeout = timeout;
-    this.retriesMethod = retriesMethod;
+    this.retryMethod = retryMethod;
     this.retries = retries;
+    this.payloads = payloads;
   }
 
   public long timestamp() {
@@ -80,8 +81,8 @@ public class TxEvent {
     return timeout;
   }
 
-  public String retriesMethod() {
-    return retriesMethod;
+  public String retryMethod() {
+    return retryMethod;
   }
 
   public int retries() {
@@ -95,7 +96,9 @@ public class TxEvent {
         ", localTxId='" + localTxId + '\'' +
         ", parentTxId='" + parentTxId + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", timeout='" + timeout + '\'' +
+        ", timeout=" + timeout +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         ", payloads=" + Arrays.toString(payloads) +
         '}';
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index cb2580e..5d2ae12 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,8 +21,9 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      int timeout, String retryMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retryMethod,
+        retries, payloads);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index e9bf6a7..78c4b91 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -50,7 +50,9 @@ public @interface Compensable {
    *
    * @return
    */
-  String compensationMethod();
+  String compensationMethod() default "";
+
+  int retryDelayInMilliseconds() default 0;
 
   /**
    * <code>@Compensable</code> method timeout, in seconds. <br>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 76ffc03..1e01ea5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -47,7 +48,8 @@ public class CompensableInterceptorTest {
   };
 
   private final String message = uniquify("message");
-  private final String retriesMethod = uniquify("retries");
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -64,7 +66,7 @@ public class CompensableInterceptorTest {
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
     int retries = new Random().nextInt();
-    interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, retries, message);
 
     TxEvent event = messages.get(0);
 
@@ -72,7 +74,7 @@ public class CompensableInterceptorTest {
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
     assertThat(event.retries(), is(retries));
-    assertThat(event.retriesMethod(), is(retriesMethod));
+    assertThat(event.retryMethod(), is(retryMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index c9c7394..d5d5de5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -42,15 +42,21 @@ public class CompensationMessageHandlerTest {
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
+
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final OmegaContext omegaContext = mock(OmegaContext.class);
   private final CompensationContext context = mock(CompensationContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context);
+
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+
+  @Before
+  public void setUp() {
+    events.clear();
+  }
 
   @Test
-  public void sendsEventOnCompensationCompleted() throws Exception {
+  public void sendsCompensatedEventOnCompensationCompleted() {
     handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
 
     assertThat(events.size(), is(1));
@@ -63,6 +69,6 @@ public class CompensationMessageHandlerTest {
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
 
-    verify(context).compensate(globalTxId, localTxId, compensationMethod, payload);
+    verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
similarity index 55%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
index 31d148f..75062bc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import javax.transaction.InvalidTransactionException;
@@ -40,28 +42,39 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-public class TransactionAspectTest {
+public class DefaultRecoveryTest {
   private final List<TxEvent> messages = new ArrayList<>();
+
   private final String globalTxId = UUID.randomUUID().toString();
+
   private final String localTxId = UUID.randomUUID().toString();
 
+  private final String parentTxId = UUID.randomUUID().toString();
+
   private final String newLocalTxId = UUID.randomUUID().toString();
 
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
   private final MessageSender sender = e -> {
     messages.add(e);
     return new AlphaResponse(false);
   };
-  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
-  @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
-  private final Compensable compensable = mock(Compensable.class);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
-  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+  private final RecoveryPolicy recoveryPolicy = new DefaultRecovery();
 
   @Before
   public void setUp() throws Exception {
@@ -71,75 +84,99 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
   }
 
   @Test
-  public void newLocalTxIdInCompensable() throws Throwable {
-    aspect.advise(joinPoint, compensable);
+  public void recordEndedEventWhenSuccess() throws Throwable {
+    when(joinPoint.proceed()).thenReturn(null);
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
 
-    TxEvent startedEvent = messages.get(0);
+    assertThat(messages.size(), is(2));
 
+    TxEvent startedEvent = messages.get(0);
     assertThat(startedEvent.globalTxId(), is(globalTxId));
-    assertThat(startedEvent.localTxId(), is(newLocalTxId));
-    assertThat(startedEvent.parentTxId(), is(localTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
 
     TxEvent endedEvent = messages.get(1);
-
     assertThat(endedEvent.globalTxId(), is(globalTxId));
-    assertThat(endedEvent.localTxId(), is(newLocalTxId));
-    assertThat(endedEvent.parentTxId(), is(localTxId));
+    assertThat(endedEvent.localTxId(), is(localTxId));
+    assertThat(endedEvent.parentTxId(), is(parentTxId));
     assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
-
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
   @Test
-  public void restoreContextOnCompensableError() throws Throwable {
-    RuntimeException oops = new RuntimeException("oops");
-
+  public void recordAbortedEventWhenFailed() throws Throwable {
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
-      assertThat(e, is(oops));
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
+    assertThat(messages.size(), is(2));
 
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    TxEvent startedEvent = messages.get(0);
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
+
+    TxEvent abortedEvent = messages.get(1);
+    assertThat(abortedEvent.globalTxId(), is(globalTxId));
+    assertThat(abortedEvent.localTxId(), is(localTxId));
+    assertThat(abortedEvent.parentTxId(), is(parentTxId));
+    assertThat(abortedEvent.type(), is(EventType.TxAbortedEvent));
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
+  public void returnImmediatelyWhenReceivedRejectResponse() {
     MessageSender sender = mock(MessageSender.class);
     when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+    CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
+
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
       expectFailing(InvalidTransactionException.class);
     } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
       assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
     }
 
     verify(sender, times(1)).send(any());
   }
 
+  @Test
+  public void recordRetryMethodWhenRetriesIsSet() throws Throwable {
+    int retries = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
+    when(compensable.retries()).thenReturn(retries);
+
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, retries);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(retries));
+    assertThat(startedEvent.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+  }
+
   private String doNothing() {
     return "doNothing";
   }
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
similarity index 61%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
index 31d148f..76fe55a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -40,28 +41,41 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-public class TransactionAspectTest {
+public class ForwardRecoveryTest {
   private final List<TxEvent> messages = new ArrayList<>();
+
   private final String globalTxId = UUID.randomUUID().toString();
+
   private final String localTxId = UUID.randomUUID().toString();
 
+  private final String parentTxId = UUID.randomUUID().toString();
+
   private final String newLocalTxId = UUID.randomUUID().toString();
 
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
   private final MessageSender sender = e -> {
     messages.add(e);
     return new AlphaResponse(false);
   };
-  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
-  @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
-  private final Compensable compensable = mock(Compensable.class);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
-  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+  private final RecoveryPolicy recoveryPolicy = new ForwardRecovery();
+
+  private volatile OmegaException exception;
 
   @Before
   public void setUp() throws Exception {
@@ -71,75 +85,75 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
   }
 
   @Test
-  public void newLocalTxIdInCompensable() throws Throwable {
-    aspect.advise(joinPoint, compensable);
-
-    TxEvent startedEvent = messages.get(0);
-
-    assertThat(startedEvent.globalTxId(), is(globalTxId));
-    assertThat(startedEvent.localTxId(), is(newLocalTxId));
-    assertThat(startedEvent.parentTxId(), is(localTxId));
-    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+  public void forwardExceptionWhenGlobalTxAborted() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
-    TxEvent endedEvent = messages.get(1);
+    CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-    assertThat(endedEvent.globalTxId(), is(globalTxId));
-    assertThat(endedEvent.localTxId(), is(newLocalTxId));
-    assertThat(endedEvent.parentTxId(), is(localTxId));
-    assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
+      expectFailing(InvalidTransactionException.class);
+    } catch (InvalidTransactionException e) {
+      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
+    }
 
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    verify(sender, times(1)).send(any());
   }
 
   @Test
-  public void restoreContextOnCompensableError() throws Throwable {
-    RuntimeException oops = new RuntimeException("oops");
-
+  public void throwExceptionWhenRetryReachesMaximum() throws Throwable {
+    when(compensable.retries()).thenReturn(2);
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 2);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
-      assertThat(e, is(oops));
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+    assertThat(messages.get(2).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
-    MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(new AlphaResponse(true));
-
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
-    try {
-      aspect.advise(joinPoint, compensable);
-      expectFailing(InvalidTransactionException.class);
-    } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
-      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
-    }
+  public void keepRetryingTillInterrupted() throws Throwable {
+    when(compensable.retries()).thenReturn(-1);
+    when(compensable.retryDelayInMilliseconds()).thenReturn(1000);
+    when(joinPoint.proceed()).thenThrow(oops);
 
-    verify(sender, times(1)).send(any());
+    Thread thread = new Thread(() -> {
+      try {
+        recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, -1);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        exception = e;
+      } catch (Throwable throwable) {
+        fail("unexpected exception throw: " + throwable);
+      }
+    });
+    thread.start();
+
+    thread.interrupt();
+    thread.join();
+
+    assertThat(exception.getMessage().contains("Failed to handle tx because it is interrupted"), is(true));
   }
 
   private String doNothing() {
     return "doNothing";
   }
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 31d148f..0aa9549 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -20,18 +20,13 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -40,13 +35,11 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-
   private final String newLocalTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = e -> {
@@ -71,6 +64,7 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
@@ -86,6 +80,8 @@ public class TransactionAspectTest {
     assertThat(startedEvent.localTxId(), is(newLocalTxId));
     assertThat(startedEvent.parentTxId(), is(localTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod().isEmpty(), is(true));
 
     TxEvent endedEvent = messages.get(1);
 
@@ -123,20 +119,84 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
-    MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(new AlphaResponse(true));
+  public void retryReachesMaximumAndForwardException() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops);
+    when(compensable.retries()).thenReturn(3);
 
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
-      expectFailing(InvalidTransactionException.class);
-    } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
-      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    verify(sender, times(1)).send(any());
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(3));
+    assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(2));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxAbortedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+  }
+
+  @Test
+  public void keepRetryingTillSuccess() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
+    when(compensable.retries()).thenReturn(-1);
+
+    aspect.advise(joinPoint, compensable);
+
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(-1));
+    assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(-1));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(-1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxEndedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
   private String doNothing() {
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9605a37..d2c6f77 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,9 +22,11 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
+  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {
+  }
   rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
-  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
+  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
+  }
 }
 
 message GrpcServiceConfig {
@@ -48,13 +50,14 @@ message GrpcTxEvent {
   string instanceId = 9;
   int32 timeout = 10;
   int32 retries = 11;
-  string retriesMethod = 12;
+  string retryMethod = 12;
 }
 
 message GrpcCompensateCommand {
   string globalTxId = 1;
   string localTxId = 2;
   string parentTxId = 3;
-  string compensateMethod = 4;
+  string compensationMethod = 4;
   bytes payloads = 5;
-}
\ No newline at end of file
+}
+

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.