You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:44 UTC

[incubator-servicecomb-saga] branch master updated (aff2719 -> ce71ab7)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from aff2719  SCB-505 Update to throw exception when the transaction timeout
     new bd5ed77  SCB-224: alpha support retries
     new 0651b91  SCB-224: omega support retries
     new f75e0b9  SCB-224 alpha support retries event
     new 2357d13  SCB-224 omega support retries event
     new 9d2a0b8  SCB-224 alpha:support retries use command
     new 7e01a4f  SCB-224 retry synchronously
     new ce71ab7  SCB-224 resolved rebase conflicts

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/servicecomb/saga/PackStepdefs.java  |  16 +--
 .../servicecomb/saga/alpha/core/Command.java       |   6 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  |  22 ++--
 .../saga/alpha/core/PushBackOmegaCallback.java     |   9 +-
 .../saga/alpha/core/TxConsistentService.java       |  16 +--
 .../servicecomb/saga/alpha/core/TxEvent.java       |  35 +++++-
 .../saga/alpha/core/TxEventRepository.java         |   2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   |  17 ++-
 .../saga/alpha/server/CommandEntityRepository.java |  15 ++-
 .../saga/alpha/server/GrpcOmegaCallback.java       |   2 +-
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |   2 +
 .../saga/alpha/server/SpringCommandRepository.java |   9 +-
 .../saga/alpha/server/SpringTxEventRepository.java |  15 +--
 .../alpha/server/SpringTxTimeoutRepository.java    |   5 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  48 +++++---
 .../src/main/resources/schema-mysql.sql            |   3 +
 .../src/main/resources/schema-postgresql.sql       |   2 +
 .../saga/alpha/server/AlphaIntegrationTest.java    |  80 ++++++++++----
 alpha/alpha-server/src/test/resources/schema.sql   |   2 +
 .../pack/tests/CommandEnvelopeRepository.java      |   8 +-
 .../integration/pack/tests/GreetingController.java |   8 ++
 .../integration/pack/tests/GreetingService.java    |  22 ++++
 .../saga/integration/pack/tests/PackIT.java        |  92 ++++++++++++++--
 .../connector/grpc/GrpcClientMessageSender.java    |   7 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   6 +-
 .../grpc/LoadBalancedClusterMessageSender.java     |   8 +-
 .../connector/grpc/PushBackReconnectRunnable.java  |   8 +-
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  15 ++-
 .../connector/grpc/RetryableMessageSenderTest.java |   3 +-
 .../saga/omega/context/CompensationContext.java    |   6 +-
 .../spring/CompensableAnnotationProcessor.java     |   1 +
 .../spring/CompensableMethodCheckingCallback.java  |  11 +-
 .../spring/TransactionAspectConfig.java            |   5 +-
 .../spring/TransactionInterceptionTest.java        | 102 ++++++++++++++---
 .../spring/TransactionalUserService.java           |  24 +++-
 .../saga/omega/transaction/spring/User.java        |   2 +-
 .../omega/transaction/CompensableInterceptor.java  |  10 +-
 .../transaction/CompensationMessageHandler.java    |   6 +-
 ...TransactionAspect.java => DefaultRecovery.java} |  56 +++++-----
 .../omega/transaction/EventAwareInterceptor.java   |   6 +-
 .../saga/omega/transaction/ForwardRecovery.java    |  76 +++++++++++++
 .../saga/omega/transaction/RecoveryPolicy.java}    |  18 ++-
 ...phaResponse.java => RecoveryPolicyFactory.java} |  19 ++--
 .../saga/omega/transaction/SagaEndedEvent.java     |   5 +-
 .../transaction/SagaStartAnnotationProcessor.java  |   3 +-
 .../saga/omega/transaction/SagaStartAspect.java    |   2 +-
 .../saga/omega/transaction/SagaStartedEvent.java   |   2 +-
 .../saga/omega/transaction/TransactionAspect.java  |  33 +-----
 .../saga/omega/transaction/TxAbortedEvent.java     |   3 +-
 .../saga/omega/transaction/TxCompensatedEvent.java |   2 +-
 .../saga/omega/transaction/TxEndedEvent.java       |   2 +-
 .../saga/omega/transaction/TxEvent.java            |  23 +++-
 .../saga/omega/transaction/TxStartedEvent.java     |   7 +-
 .../omega/transaction/annotations/Compensable.java |   8 +-
 .../transaction/CompensableInterceptorTest.java    |   8 +-
 .../CompensationMessageHandlerTest.java            |  12 +-
 ...ionAspectTest.java => DefaultRecoveryTest.java} | 115 ++++++++++++-------
 ...ionAspectTest.java => ForwardRecoveryTest.java} | 122 ++++++++++++---------
 .../SagaStartAnnotationProcessorTest.java          |   4 +-
 .../omega/transaction/TransactionAspectTest.java   |  92 +++++++++++++---
 .../src/main/proto/GrpcTxEvent.proto               |  13 ++-
 61 files changed, 898 insertions(+), 383 deletions(-)
 copy omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java => integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java (77%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{TransactionAspect.java => DefaultRecovery.java} (56%)
 create mode 100644 omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
 copy omega/{omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredService.java => omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java} (65%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{AlphaResponse.java => RecoveryPolicyFactory.java} (56%)
 copy omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/{TransactionAspectTest.java => DefaultRecoveryTest.java} (55%)
 copy omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/{TransactionAspectTest.java => ForwardRecoveryTest.java} (61%)

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 07/07: SCB-224 resolved rebase conflicts

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ce71ab73ae80bc90ba59fe9b038f134fef9426b1
Author: Eric Lee <er...@gmail.com>
AuthorDate: Tue Apr 24 16:17:39 2018 +0800

    SCB-224 resolved rebase conflicts
    
    Signed-off-by: Eric Lee <er...@gmail.com>
---
 .../saga/alpha/core/TxConsistentService.java            | 17 +++--------------
 .../saga/alpha/core/TxConsistentServiceTest.java        | 13 +++++++++++++
 .../saga/alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 4382fd3..9a7f82b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -23,7 +23,6 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -41,22 +40,12 @@ public class TxConsistentService {
   }
 
   public boolean handle(TxEvent event) {
-    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
-      LOG.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted",
-          event.globalTxId());
+    if (types.contains(event.type()) && isGlobalTxAborted(event)) {
+      LOG.info("Transaction event {} rejected, because its parent with globalTxId {} was already aborted",
+          event.type(), event.globalTxId());
       return false;
     }
 
-    if (SagaEndedEvent.name().equals(event.type()) && !event.expiryTime().equals(new Date(TxEvent.MAX_TIMESTAMP))) {
-      // if we get the SagaEndedEvent and the expiryTime is not MAX_TIME, we need to check if it is timeout
-      if (eventRepository.findTimeoutEvents().stream()
-          .filter(txEvent -> txEvent.globalTxId().equals(event.globalTxId()))
-          .count() == 1) {
-        LOG.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
-        return false;
-      }
-    }
-
     eventRepository.save(event);
 
     return true;
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index e80d5a8..8faf0e8 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -132,6 +132,19 @@ public class TxConsistentServiceTest {
     assertThat(events.size(), is(2));
   }
 
+  @Test
+  public void skipSagaEndedEvent_IfGlobalTxAlreadyFailed() {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(eventOf(SagaStartedEvent, localTxId1));
+    events.add(eventOf(TxAbortedEvent, localTxId1));
+
+    TxEvent event = eventOf(SagaEndedEvent, localTxId1);
+
+    consistentService.handle(event);
+
+    assertThat(events.size(), is(2));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod,
         payloads);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 68e6233..6a8a263 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -72,7 +72,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND ( SELECT MIN(t1.retries) FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
       + "    AND t1.localTxId = t.localTxId "
-      + "    AND t1.type = 'TxStartedEvent' ) = 0 ")
+      + "    AND t1.type IN ('TxStartedEvent', 'SagaStartedEvent') ) = 0 ")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
   @Query("SELECT t FROM TxEvent t "

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 04/07: SCB-224 omega support retries event

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 2357d138d00832df634c74f8d5b06f4045d89a1a
Author: zhang2014 <co...@gmail.com>
AuthorDate: Sun Jan 14 16:44:30 2018 +0800

    SCB-224 omega support retries event
---
 .../saga/alpha/core/CompositeOmegaCallback.java    | 43 +++-------
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  5 +-
 .../saga/alpha/core/PushBackOmegaCallback.java     | 10 ---
 .../saga/alpha/core/TxConsistentService.java       |  1 -
 .../servicecomb/saga/alpha/core/TxEvent.java       |  8 +-
 .../saga/alpha/server/GrpcOmegaCallback.java       | 12 ---
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  2 +
 .../saga/alpha/server/TxEventEnvelope.java         | 96 ++++++++++++++++++++++
 .../src/main/resources/schema-mysql.sql            |  2 +
 .../saga/omega/context/CompensationContext.java    | 16 ++--
 .../saga/omega/context/OmegaContext.java           |  2 +
 .../spring/TransactionAspectConfig.java            |  4 +-
 .../omega/transaction/CompensableInterceptor.java  | 10 ++-
 .../transaction/CompensationMessageHandler.java    | 11 ++-
 .../CompensationMessageHandlerTest.java            |  4 +-
 15 files changed, 151 insertions(+), 75 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index b1c7e09..32e5102 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.Collections.emptyMap;
+
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 
 public class CompositeOmegaCallback implements OmegaCallback {
   private final Map<String, Map<String, OmegaCallback>> callbacks;
@@ -29,44 +29,23 @@ public class CompositeOmegaCallback implements OmegaCallback {
   }
 
   @Override
-  public void retries(TxEvent event) {
-    OmegaCallback omegaCallback = callbackFor(event);
+  public void compensate(TxEvent event) {
+    Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
 
-    try {
-      omegaCallback.retries(event);
-    } catch (Exception e) {
-      removeEventCallback(event, omegaCallback);
-      throw e;
+    if (serviceCallbacks.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + event.serviceName());
     }
-  }
 
-  @Override
-  public void compensate(TxEvent event) {
-    OmegaCallback omegaCallback = callbackFor(event);
+    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
+    if (omegaCallback == null) {
+      omegaCallback = serviceCallbacks.values().iterator().next();
+    }
 
     try {
       omegaCallback.compensate(event);
     } catch (Exception e) {
-      removeEventCallback(event, omegaCallback);
+      serviceCallbacks.values().remove(omegaCallback);
       throw e;
     }
   }
-
-  private OmegaCallback callbackFor(String instanceId, Map<String, OmegaCallback> serviceCallbacks) {
-    OmegaCallback omegaCallback = serviceCallbacks.get(instanceId);
-    if (Objects.isNull(omegaCallback)) {
-      return serviceCallbacks.values().iterator().next();
-    }
-    return omegaCallback;
-  }
-
-  private OmegaCallback callbackFor(TxEvent event) {
-    return Optional.ofNullable(callbacks.get(event.serviceName())).filter(callbacks -> !callbacks.isEmpty())
-        .map(serviceCallbacks -> callbackFor(event.instanceId(), serviceCallbacks))
-        .orElseThrow(() -> new AlphaException("No such omega callback found for service " + event.serviceName()));
-  }
-
-  private void removeEventCallback(TxEvent event, OmegaCallback omegaCallback) {
-    callbacks.get(event.serviceName()).values().remove(omegaCallback);
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index d926ed0..c7dfbbb 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,10 +18,7 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
-  void retries(TxEvent event);
-
   void compensate(TxEvent event);
 
-  default void disconnect() {
-  }
+  default void disconnect() {}
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 290eb20..3b27c14 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -35,16 +35,6 @@ public class PushBackOmegaCallback implements OmegaCallback {
   }
 
   @Override
-  public void retries(TxEvent event) {
-    try {
-      underlying.compensate(event);
-    } catch (Exception e) {
-      logError(event, e);
-      pendingCompensations.offer(() -> compensate(event));
-    }
-  }
-
-  @Override
   public void compensate(TxEvent event) {
     try {
       underlying.compensate(event);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index a9a3ed2..b519c1b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -29,7 +29,6 @@ import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
-
 public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 5426e8b..1abb7fe 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -87,7 +87,7 @@ public class TxEvent {
       int timeout,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout,
-        payloads);
+        "", 0, payloads);
   }
 
   public TxEvent(
@@ -102,7 +102,7 @@ public class TxEvent {
       int timeout,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod,
-        timeout, payloads);
+        timeout, "", 0, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -132,6 +132,8 @@ public class TxEvent {
       String type,
       String compensationMethod,
       Date expiryTime,
+      String retriesMethod,
+      int retries,
       byte[] payloads) {
     this.surrogateId = surrogateId;
     this.serviceName = serviceName;
@@ -143,6 +145,8 @@ public class TxEvent {
     this.type = type;
     this.compensationMethod = compensationMethod;
     this.expiryTime = expiryTime;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
     this.payloads = payloads;
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 83a6b9d..5a95281 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -37,18 +37,6 @@ class GrpcOmegaCallback implements OmegaCallback {
   }
 
   @Override
-  public void retries(TxEvent event) {
-    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
-        .setGlobalTxId(event.globalTxId())
-        .setLocalTxId(event.localTxId())
-        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
-        .setCompensateMethod(event.retriesMethod())
-        .setPayloads(ByteString.copyFrom(event.payloads()))
-        .build();
-    observer.onNext(command);
-  }
-
-  @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index ee7e2e4..99457a2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -85,6 +85,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getType(),
         message.getCompensationMethod(),
         message.getTimeout(),
+        message.getRetriesMethod(),
+        message.getRetries(),
         message.getPayloads().toByteArray()
     ));
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
new file mode 100644
index 0000000..7d93462
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.Date;
+
+import javax.persistence.Embedded;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+
+@Entity class TxEventEnvelope {
+  @Id
+  @GeneratedValue
+  private long surrogateId;
+
+  @Embedded
+  private TxEvent event;
+
+  private TxEventEnvelope() {
+  }
+
+  TxEventEnvelope(TxEvent event) {
+    this.event = event;
+  }
+
+  public TxEventEnvelope(
+      String serviceName, String instanceId, String globalTxId,
+      String localTxId, String parentTxId, String type, String compensationMethod,
+      String retriesMethod, int retries, byte[] payloads) {
+    this.event = new TxEvent(
+        serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type,
+        compensationMethod, retriesMethod, retries, payloads);
+  }
+
+  String serviceName() {
+    return event.serviceName();
+  }
+
+  String instanceId() {
+    return event.instanceId();
+  }
+
+  public long creationTime() {
+    return event.creationTime().getTime();
+  }
+
+  String globalTxId() {
+    return event.globalTxId();
+  }
+
+  String localTxId() {
+    return event.localTxId();
+  }
+
+  String parentTxId() {
+    return event.parentTxId();
+  }
+
+  String type() {
+    return event.type();
+  }
+
+  String compensationMethod() {
+    return event.compensationMethod();
+  }
+
+  byte[] payloads() {
+    return event.payloads();
+  }
+
+  int retries() {
+    return event.retries();
+  }
+
+  TxEvent event() {
+    return event;
+  }
+}
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index b0bc8d7..c21e518 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  retries_method varchar(256) NOT NULL,
+  retries int NOT NULL,
   payloads varbinary(10240),
   PRIMARY KEY (surrogateId),
   INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 067af92..8d9eb7e 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -28,15 +28,20 @@ import org.slf4j.LoggerFactory;
 
 public class CompensationContext {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final Map<String, CompensationContextInternal> contexts = new ConcurrentHashMap<>();
+
+  private final Map<String, TransactionContextInternal> contexts = new ConcurrentHashMap<>();
+
+  public CompensationContext() {
+  }
 
   public void addCompensationContext(Method compensationMethod, Object target) {
     compensationMethod.setAccessible(true);
-    contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
+    contexts.put(compensationMethod.toString(),
+        new TransactionContextInternal(target, compensationMethod));
   }
 
   public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
-    CompensationContextInternal contextInternal = contexts.get(compensationMethod);
+    TransactionContextInternal contextInternal = contexts.get(compensationMethod);
 
     try {
       contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
@@ -49,11 +54,12 @@ public class CompensationContext {
     }
   }
 
-  private static final class CompensationContextInternal {
+  private static final class TransactionContextInternal {
     private final Object target;
+
     private final Method compensationMethod;
 
-    private CompensationContextInternal(Object target, Method compensationMethod) {
+    private TransactionContextInternal(Object target, Method compensationMethod) {
       this.target = target;
       this.compensationMethod = compensationMethod;
     }
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index daa8e7c..ac9f02c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
+import java.util.Map;
+
 public class OmegaContext {
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 5358db5..41be8f7 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -34,8 +34,8 @@ import org.springframework.core.annotation.Order;
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, CompensationContext context) {
-    return new CompensationMessageHandler(sender, context);
+  MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
+    return new CompensationMessageHandler(sender, omegaContext, context);
   }
 
   @Order(0)
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index fcce034..d3e2bd1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -18,14 +18,17 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
+
   private final OmegaContext context;
+
   private final MessageSender sender;
 
   CompensableInterceptor(OmegaContext context, MessageSender sender) {
-    this.context = context;
     this.sender = sender;
+    this.context = context;
   }
 
   @Override
@@ -37,12 +40,11 @@ class CompensableInterceptor implements EventAwareInterceptor {
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
     sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod));
-
   }
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(
-        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
+    sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
+        throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 46c1e9b..15cf91a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -18,19 +18,26 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
+  private final OmegaContext omegaContext;
   private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
+  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) {
     this.sender = sender;
     this.context = context;
+    this.omegaContext = omegaContext;
   }
 
   @Override
-  public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
+  public void onReceive(String globalTxId, String localTxId, String parentTxId,
+      String compensationMethod, Object... payloads) {
+    String oldLocalTxId = omegaContext.localTxId();
+    omegaContext.setLocalTxId(parentTxId);
     context.compensate(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+    omegaContext.setLocalTxId(oldLocalTxId);
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index 0b33d4b..c9c7394 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -44,8 +45,9 @@ public class CompensationMessageHandlerTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
+  private final OmegaContext omegaContext = mock(OmegaContext.class);
   private final CompensationContext context = mock(CompensationContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context);
 
   @Test
   public void sendsEventOnCompensationCompleted() throws Exception {

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 03/07: SCB-224 alpha support retries event

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit f75e0b9bff120d42b7c334302635b6ecb2b09da8
Author: zhang2014 <co...@gmail.com>
AuthorDate: Sun Jan 14 13:31:39 2018 +0800

    SCB-224 alpha support retries event
---
 .../saga/alpha/core/CompositeOmegaCallback.java    | 50 +++++++++++-----------
 .../servicecomb/saga/alpha/core/TxEvent.java       |  5 +--
 2 files changed, 26 insertions(+), 29 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index bce8f4b..b1c7e09 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.Collections.emptyMap;
-
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 
 public class CompositeOmegaCallback implements OmegaCallback {
   private final Map<String, Map<String, OmegaCallback>> callbacks;
@@ -30,43 +30,43 @@ public class CompositeOmegaCallback implements OmegaCallback {
 
   @Override
   public void retries(TxEvent event) {
-    Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
-
-    if (serviceCallbacks.isEmpty()) {
-      throw new AlphaException("No such omega callback found for service " + event.serviceName());
-    }
-
-    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
-    if (omegaCallback == null) {
-      omegaCallback = serviceCallbacks.values().iterator().next();
-    }
+    OmegaCallback omegaCallback = callbackFor(event);
 
     try {
-      omegaCallback.compensate(event);
+      omegaCallback.retries(event);
     } catch (Exception e) {
-      serviceCallbacks.values().remove(omegaCallback);
+      removeEventCallback(event, omegaCallback);
       throw e;
     }
   }
 
   @Override
   public void compensate(TxEvent event) {
-    Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
-
-    if (serviceCallbacks.isEmpty()) {
-      throw new AlphaException("No such omega callback found for service " + event.serviceName());
-    }
-
-    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
-    if (omegaCallback == null) {
-      omegaCallback = serviceCallbacks.values().iterator().next();
-    }
+    OmegaCallback omegaCallback = callbackFor(event);
 
     try {
       omegaCallback.compensate(event);
     } catch (Exception e) {
-      serviceCallbacks.values().remove(omegaCallback);
+      removeEventCallback(event, omegaCallback);
       throw e;
     }
   }
+
+  private OmegaCallback callbackFor(String instanceId, Map<String, OmegaCallback> serviceCallbacks) {
+    OmegaCallback omegaCallback = serviceCallbacks.get(instanceId);
+    if (Objects.isNull(omegaCallback)) {
+      return serviceCallbacks.values().iterator().next();
+    }
+    return omegaCallback;
+  }
+
+  private OmegaCallback callbackFor(TxEvent event) {
+    return Optional.ofNullable(callbacks.get(event.serviceName())).filter(callbacks -> !callbacks.isEmpty())
+        .map(serviceCallbacks -> callbackFor(event.instanceId(), serviceCallbacks))
+        .orElseThrow(() -> new AlphaException("No such omega callback found for service " + event.serviceName()));
+  }
+
+  private void removeEventCallback(TxEvent event, OmegaCallback omegaCallback) {
+    callbacks.get(event.serviceName()).values().remove(omegaCallback);
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 5176ce0..5426e8b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,16 +17,12 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 import java.util.Date;
 
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.Transient;
 
 @Entity
 @Table(name = "TxEvent")
@@ -209,6 +205,7 @@ public class TxEvent {
         ", expiryTime='" + expiryTime + '\'' +
         '}';
   }
+
   public int retries() {
     return retries;
   }

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 05/07: SCB-224 alpha:support retries use command

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9d2a0b84166403690685e17f9f3ba176204fdfbe
Author: zhang2014 <co...@gmail.com>
AuthorDate: Tue Jan 23 23:42:35 2018 +0800

    SCB-224 alpha:support retries use command
---
 .../servicecomb/saga/alpha/core/Command.java       |  4 +-
 .../saga/alpha/core/CommandRepository.java         |  2 +-
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  3 +-
 .../saga/alpha/core/TxConsistentService.java       | 54 ++----------
 .../saga/alpha/server/SpringCommandRepository.java | 14 ++++
 .../saga/alpha/server/TxEventEnvelope.java         | 96 ----------------------
 .../alpha/server/TxEventEnvelopeRepository.java    |  1 +
 .../src/main/resources/schema-postgresql.sql       |  2 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 49 +++++++++++
 alpha/alpha-server/src/test/resources/schema.sql   |  2 +
 .../saga/omega/context/CompensationContext.java    | 15 ++--
 .../saga/omega/context/OmegaContext.java           |  2 -
 .../omega/transaction/CompensableInterceptor.java  |  2 -
 13 files changed, 83 insertions(+), 163 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 0f016d3..4edb928 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -115,11 +115,11 @@ public class Command {
     return localTxId;
   }
 
-  String parentTxId() {
+  public String parentTxId() {
     return parentTxId;
   }
 
-  String compensationMethod() {
+  public String compensationMethod() {
     return compensationMethod;
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 2bbea77..25288ed 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(TxEvent abortEvent);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index c7dfbbb..f60a44d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -20,5 +20,6 @@ package org.apache.servicecomb.saga.alpha.core;
 public interface OmegaCallback {
   void compensate(TxEvent event);
 
-  default void disconnect() {}
+  default void disconnect() {
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index b519c1b..968e5b7 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,17 +17,17 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -61,50 +61,6 @@ public class TxConsistentService {
     return true;
   }
 
-//  private void compensate(TxEvent event) {
-//    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
-//
-//    Optional<TxEvent> startedEvent = events.stream().filter(e -> e.containChildren(event)).findFirst();
-//
-//    startedEvent.ifPresent(compensateEvent -> {
-//      Integer[] retiesAndTimes = eventsToRetries.compute(event.parentTxId(), (k, v) ->
-//          v == null ? new Integer[] {compensateEvent.retries(), 0} : new Integer[] {v[0], v[1] + 1});
-//      List<TxEvent> compensationEvents =
-//          retiesAndTimes[0] >= retiesAndTimes[1] ? events : Collections.singletonList(
-//              new TxEvent(
-//                  event.serviceName(), event.instanceId(), event.creationTime(), event.globalTxId(),
-//                  event.localTxId(), event.parentTxId(), event.type(), event.retriesMethod(), event.payloads()
-//              ));
-//
-//      compensateImpl(event.globalTxId(), compensationEvents);
-//    });
-//  }
-//
-//  private void compensateImpl(String globalTxId, List<TxEvent> events) {
-//    events.removeIf(this::isCompensationScheduled);
-//
-//    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(globalTxId, k -> new HashSet<>());
-//    events.forEach(e -> localTxIds.add(e.localTxId()));
-//
-//    events.forEach(omegaCallback::compensate);
-//  }
-
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global TX in @Compensable
-//  private void updateCompensateStatus(TxEvent event) {
-//    Set<String> events = eventsToCompensate.get(event.globalTxId());
-//    if (events != null) {
-//      events.remove(event.localTxId());
-//      if (events.isEmpty()) {
-//        eventsToCompensate.remove(event.globalTxId());
-//        Integer[] retiesAndTimes = eventsToRetries.get(event.parentTxId());
-//        if (retiesAndTimes == null || retiesAndTimes[0] >= retiesAndTimes[1]) {
-//          markGlobalTxEnd(event);
-//          eventsToRetries.remove(event.parentTxId());
-//        }
-//      }
-//    }
-//  }
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 086f88e..ed7b4f1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -91,4 +91,18 @@ public class SpringCommandRepository implements CommandRepository {
 
     return commands;
   }
+
+  private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
+    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
+        .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
+            && Objects.equals(c.localTxId(), localTxId)).count();
+  }
+
+  private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
+    return Collections.singletonList(new TxEvent(
+        abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
+        txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
+        txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
+    ));
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
deleted file mode 100644
index 7d93462..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server;
-
-import java.util.Date;
-
-import javax.persistence.Embedded;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-
-@Entity class TxEventEnvelope {
-  @Id
-  @GeneratedValue
-  private long surrogateId;
-
-  @Embedded
-  private TxEvent event;
-
-  private TxEventEnvelope() {
-  }
-
-  TxEventEnvelope(TxEvent event) {
-    this.event = event;
-  }
-
-  public TxEventEnvelope(
-      String serviceName, String instanceId, String globalTxId,
-      String localTxId, String parentTxId, String type, String compensationMethod,
-      String retriesMethod, int retries, byte[] payloads) {
-    this.event = new TxEvent(
-        serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type,
-        compensationMethod, retriesMethod, retries, payloads);
-  }
-
-  String serviceName() {
-    return event.serviceName();
-  }
-
-  String instanceId() {
-    return event.instanceId();
-  }
-
-  public long creationTime() {
-    return event.creationTime().getTime();
-  }
-
-  String globalTxId() {
-    return event.globalTxId();
-  }
-
-  String localTxId() {
-    return event.localTxId();
-  }
-
-  String parentTxId() {
-    return event.parentTxId();
-  }
-
-  String type() {
-    return event.type();
-  }
-
-  String compensationMethod() {
-    return event.compensationMethod();
-  }
-
-  byte[] payloads() {
-    return event.payloads();
-  }
-
-  int retries() {
-    return event.retries();
-  }
-
-  TxEvent event() {
-    return event;
-  }
-}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 470caa5..3a7edb3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -73,6 +73,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.compensationMethod != t.retriesMethod "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 41815ee..674e051 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime timestamp(6) NOT NULL,
+  retriesMethod varchar(256) NOT NULL,
+  retries int NOT NULL,
   payloads bytea
 );
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 8b2672c..26aa17a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -438,6 +438,55 @@ public class AlphaIntegrationTest {
     return false;
   }
 
+  @Test
+  public void retiesAndCompensateOnFailure() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    String localTxId1 = UUID.randomUUID().toString();
+
+    blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId1)
+        .setParentTxId(parentTxId)
+        .setType(TxStartedEvent.name())
+        .setCompensationMethod("Compensation Method")
+        .setPayloads(ByteString.copyFrom(payload.getBytes()))
+        .setRetries(3).setRetriesMethod("Retries Method")
+        .build());
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
+
+    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+
+    for (int i = 0; i < 3; i++) {
+      blockingStub.onTxEvent(
+          eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+      await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+      GrpcCompensateCommand command = receivedCommands.poll();
+      assertThat(command.getGlobalTxId(), is(globalTxId));
+      assertThat(command.getLocalTxId(), is(localTxId1));
+      assertThat(command.getParentTxId(), is(parentTxId));
+      assertThat(command.getCompensateMethod(), is("Retries Method"));
+      assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+    }
+
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId1));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is("Compensation Method"));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index a10a4e0..ca46625 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,6 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime TIMESTAMP NOT NULL,
+  retriesMethod varchar(256) NOT NULL,
+  retries int NOT NULL,
   payloads varbinary(10240)
 );
 
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 8d9eb7e..48a67f7 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -29,19 +29,15 @@ import org.slf4j.LoggerFactory;
 public class CompensationContext {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Map<String, TransactionContextInternal> contexts = new ConcurrentHashMap<>();
-
-  public CompensationContext() {
-  }
+  private final Map<String, CompensationContextInternal> contexts = new ConcurrentHashMap<>();
 
   public void addCompensationContext(Method compensationMethod, Object target) {
     compensationMethod.setAccessible(true);
-    contexts.put(compensationMethod.toString(),
-        new TransactionContextInternal(target, compensationMethod));
+    contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
   }
 
   public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
-    TransactionContextInternal contextInternal = contexts.get(compensationMethod);
+    CompensationContextInternal contextInternal = contexts.get(compensationMethod);
 
     try {
       contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
@@ -54,12 +50,11 @@ public class CompensationContext {
     }
   }
 
-  private static final class TransactionContextInternal {
+  private static final class CompensationContextInternal {
     private final Object target;
-
     private final Method compensationMethod;
 
-    private TransactionContextInternal(Object target, Method compensationMethod) {
+    private CompensationContextInternal(Object target, Method compensationMethod) {
       this.target = target;
       this.compensationMethod = compensationMethod;
     }
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index ac9f02c..daa8e7c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
-import java.util.Map;
-
 public class OmegaContext {
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index d3e2bd1..988d8d7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -21,9 +21,7 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
-
   private final OmegaContext context;
-
   private final MessageSender sender;
 
   CompensableInterceptor(OmegaContext context, MessageSender sender) {

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 06/07: SCB-224 retry synchronously

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 7e01a4f3ca696716ca7beaedc9032e7ca6383fff
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Feb 23 15:44:26 2018 +0800

    SCB-224 retry synchronously
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../org/apache/servicecomb/saga/PackStepdefs.java  |  16 +--
 .../servicecomb/saga/alpha/core/Command.java       |   2 +-
 .../saga/alpha/core/CommandRepository.java         |   2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  |  22 ++--
 .../saga/alpha/core/PushBackOmegaCallback.java     |   9 +-
 .../saga/alpha/core/TxConsistentService.java       |   9 +-
 .../servicecomb/saga/alpha/core/TxEvent.java       |  53 +++++----
 .../saga/alpha/core/TxEventRepository.java         |   2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   |   4 +-
 .../saga/alpha/server/CommandEntityRepository.java |  15 ++-
 .../saga/alpha/server/GrpcOmegaCallback.java       |   2 +-
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |   2 +-
 .../saga/alpha/server/SpringCommandRepository.java |  23 +---
 .../saga/alpha/server/SpringTxEventRepository.java |  15 +--
 .../alpha/server/SpringTxTimeoutRepository.java    |   5 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  49 +++++---
 .../src/main/resources/schema-mysql.sql            |   5 +-
 .../src/main/resources/schema-postgresql.sql       |   4 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 129 ++++++++++-----------
 alpha/alpha-server/src/test/resources/schema.sql   |   4 +-
 .../pack/tests/CommandEnvelopeRepository.java      |  13 +--
 .../integration/pack/tests/GreetingController.java |   8 ++
 .../integration/pack/tests/GreetingService.java    |  22 ++++
 .../saga/integration/pack/tests/PackIT.java        |  92 +++++++++++++--
 .../connector/grpc/GrpcClientMessageSender.java    |   4 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   6 +-
 .../grpc/LoadBalancedClusterMessageSender.java     |   8 +-
 .../connector/grpc/PushBackReconnectRunnable.java  |   8 +-
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  15 +--
 .../connector/grpc/RetryableMessageSenderTest.java |   1 +
 .../saga/omega/context/CompensationContext.java    |   5 +-
 .../spring/CompensableAnnotationProcessor.java     |   1 +
 .../spring/CompensableMethodCheckingCallback.java  |  10 +-
 .../spring/TransactionAspectConfig.java            |   5 +-
 .../spring/TransactionInterceptionTest.java        | 103 ++++++++++++----
 .../spring/TransactionalUserService.java           |  24 +++-
 .../saga/omega/transaction/spring/User.java        |   2 +-
 .../omega/transaction/CompensableInterceptor.java  |  12 +-
 .../transaction/CompensationMessageHandler.java    |  15 +--
 ...TransactionAspect.java => DefaultRecovery.java} |  59 +++++-----
 .../omega/transaction/EventAwareInterceptor.java   |   6 +-
 .../saga/omega/transaction/ForwardRecovery.java    |  76 ++++++++++++
 ...TxCompensatedEvent.java => RecoveryPolicy.java} |  13 ++-
 ...aEndedEvent.java => RecoveryPolicyFactory.java} |  16 ++-
 .../saga/omega/transaction/SagaEndedEvent.java     |   5 +-
 .../transaction/SagaStartAnnotationProcessor.java  |   5 +-
 .../saga/omega/transaction/SagaStartAspect.java    |   2 +-
 .../saga/omega/transaction/SagaStartedEvent.java   |   2 +-
 .../omega/transaction/TimeAwareInterceptor.java    |   0
 .../saga/omega/transaction/TransactionAspect.java  |  36 +-----
 .../saga/omega/transaction/TxAbortedEvent.java     |   3 +-
 .../saga/omega/transaction/TxCompensatedEvent.java |   2 +-
 .../saga/omega/transaction/TxEndedEvent.java       |   2 +-
 .../saga/omega/transaction/TxEvent.java            |  19 +--
 .../saga/omega/transaction/TxStartedEvent.java     |   7 +-
 .../omega/transaction/annotations/Compensable.java |   4 +-
 .../transaction/CompensableInterceptorTest.java    |   8 +-
 .../CompensationMessageHandlerTest.java            |  16 ++-
 ...ionAspectTest.java => DefaultRecoveryTest.java} | 115 +++++++++++-------
 ...ionAspectTest.java => ForwardRecoveryTest.java} | 122 ++++++++++---------
 .../transaction/TimeAwareInterceptorTest.java      |   0
 .../omega/transaction/TransactionAspectTest.java   |  92 ++++++++++++---
 .../src/main/proto/GrpcTxEvent.proto               |  13 ++-
 63 files changed, 868 insertions(+), 481 deletions(-)

diff --git a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
index 4f2f729..f9af438 100644
--- a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
+++ b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
@@ -41,7 +41,7 @@ import cucumber.api.java.After;
 import cucumber.api.java8.En;
 
 public class PackStepdefs implements En {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String ALPHA_REST_ADDRESS = "alpha.rest.address";
   private static final String CAR_SERVICE_ADDRESS = "car.service.address";
@@ -73,7 +73,7 @@ public class PackStepdefs implements En {
     });
 
     Given("^Install the byteman script ([A-Za-z0-9_\\.]+) to ([A-Za-z]+) Service$", (String script, String service) -> {
-      log.info("Install the byteman script {} to {} service", script, service);
+      LOG.info("Install the byteman script {} to {} service", script, service);
       List<String> rules = new ArrayList<>();
       rules.add("target/test-classes/" + script);
       Submit bm = getBytemanSubmit(service);
@@ -81,7 +81,7 @@ public class PackStepdefs implements En {
     });
 
     When("^User ([A-Za-z]+) requests to book ([0-9]+) cars and ([0-9]+) rooms (success|fail)$", (username, cars, rooms, result) -> {
-      log.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
+      LOG.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
 
       Response resp = given()
           .pathParam("name", username)
@@ -116,7 +116,7 @@ public class PackStepdefs implements En {
 
   @After
   public void cleanUp() {
-    log.info("Cleaning up services");
+    LOG.info("Cleaning up services");
     for (String address : addresses) {
       given()
           .when()
@@ -135,7 +135,7 @@ public class PackStepdefs implements En {
       try {
         bm.deleteAllRules();
       } catch (Exception e) {
-        log.warn("Fail to delete the byteman rules " + e);
+        LOG.warn("Fail to delete the byteman rules " + e);
       }
     }
   }
@@ -156,7 +156,7 @@ public class PackStepdefs implements En {
       return;
     }
 
-    log.info("Retrieved data {} from service", actualMaps);
+    LOG.info("Retrieved data {} from service", actualMaps);
     dataTable.diff(DataTable.create(actualMaps));
   }
 
@@ -180,12 +180,12 @@ public class PackStepdefs implements En {
     if (isEmpty(infoURI)) {
       infoURI = "/info";
     }
-    log.info("The info service uri is " + infoURI);
+    LOG.info("The info service uri is " + infoURI);
     probe(address, infoURI);
   }
 
   private void probe(String address, String infoURI) {
-    log.info("Connecting to service address {}", address);
+    LOG.info("Connecting to service address {}", address);
     given()
         .when()
         .get(address + infoURI)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 4edb928..6c8f370 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -76,7 +76,7 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  private Command(long id,
+  public Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 25288ed..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(TxEvent abortEvent);
+  void saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index d3fba31..3168870 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EventScanner implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final ScheduledExecutorService scheduler;
@@ -83,7 +83,7 @@ public class EventScanner implements Runnable {
   private void findTimeoutEvents() {
     eventRepository.findTimeoutEvents()
         .forEach(event -> {
-          log.info("Found timeout event {}", event);
+          LOG.info("Found timeout event {}", event);
           timeoutRepository.save(txTimeoutOf(event));
         });
   }
@@ -93,9 +93,9 @@ public class EventScanner implements Runnable {
   }
 
   private void saveUncompensatedEventsToCommands() {
-    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId)
-        .ifPresent(event -> {
-          log.info("Found uncompensated event {}", event);
+    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+        .forEach(event -> {
+          LOG.info("Found uncompensated event {}", event);
           nextEndedEventId = event.id();
           commandRepository.saveCompensationCommands(event.globalTxId());
         });
@@ -104,7 +104,7 @@ public class EventScanner implements Runnable {
   private void updateCompensatedCommands() {
     eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
         .ifPresent(event -> {
-          log.info("Found compensated event {}", event);
+          LOG.info("Found compensated event {}", event);
           nextCompensatedEventId = event.id();
           updateCompensationStatus(event);
         });
@@ -114,13 +114,13 @@ public class EventScanner implements Runnable {
     try {
       eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
     } catch (Exception e) {
-      log.warn("Failed to delete duplicate event", e);
+      LOG.warn("Failed to delete duplicate event", e);
     }
   }
 
   private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+    LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
         event.globalTxId(),
         event.localTxId());
 
@@ -129,7 +129,7 @@ public class EventScanner implements Runnable {
 
   private void abortTimeoutEvents() {
     timeoutRepository.findFirstTimeout().forEach(timeout -> {
-      log.info("Found timeout event {} to abort", timeout);
+      LOG.info("Found timeout event {} to abort", timeout);
 
       eventRepository.save(toTxAbortedEvent(timeout));
 
@@ -152,7 +152,7 @@ public class EventScanner implements Runnable {
 
   private void markGlobalTxEnd(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
-    log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
+    LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
   }
 
   private TxEvent toTxAbortedEvent(TxTimeout timeout) {
@@ -182,7 +182,7 @@ public class EventScanner implements Runnable {
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
-          log.info("Compensating transaction with globalTxId {} and localTxId {}",
+          LOG.info("Compensating transaction with globalTxId {} and localTxId {}",
               command.globalTxId(),
               command.localTxId());
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c14..9556d7c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PushBackOmegaCallback implements OmegaCallback {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final BlockingQueue<Runnable> pendingCompensations;
   private final OmegaCallback underlying;
@@ -45,11 +45,12 @@ public class PushBackOmegaCallback implements OmegaCallback {
   }
 
   private void logError(TxEvent event, Exception e) {
-    log.error(
-        "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+    LOG.error(
+        "Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+        event.retries() == 0 ? "compensate" : "retry",
         event.serviceName(),
         event.instanceId(),
-        event.compensationMethod(),
+        event.retries() == 0 ? event.compensationMethod() : event.retryMethod(),
         event.globalTxId(),
         event.localTxId(),
         e);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b7..4382fd3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventRepository eventRepository;
 
@@ -41,8 +41,9 @@ public class TxConsistentService {
   }
 
   public boolean handle(TxEvent event) {
-    if (types.contains(event.type()) && isGlobalTxAborted(event)) {
-      log.info("Transaction event {} rejected, because its parent with globalTxId {} was already aborted", event.type(), event.globalTxId());
+    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+      LOG.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted",
+          event.globalTxId());
       return false;
     }
 
@@ -51,7 +52,7 @@ public class TxConsistentService {
       if (eventRepository.findTimeoutEvents().stream()
           .filter(txEvent -> txEvent.globalTxId().equals(event.globalTxId()))
           .count() == 1) {
-        log.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
+        LOG.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
         return false;
       }
     }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1abb7fe..17b059c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,12 +17,16 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Date;
 
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Transient;
 
 @Entity
 @Table(name = "TxEvent")
@@ -43,9 +47,9 @@ public class TxEvent {
   private String type;
   private String compensationMethod;
   private Date expiryTime;
-  private byte[] payloads;
+  private String retryMethod;
   private int retries;
-  private String retriesMethod;
+  private byte[] payloads;
 
   private TxEvent() {
   }
@@ -61,6 +65,8 @@ public class TxEvent {
         event.type,
         event.compensationMethod,
         event.expiryTime,
+        event.retryMethod,
+        event.retries,
         event.payloads);
   }
 
@@ -73,7 +79,8 @@ public class TxEvent {
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, payloads);
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, "", 0,
+        payloads);
   }
 
   public TxEvent(
@@ -85,9 +92,11 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout,
-        "", 0, payloads);
+        retryMethod, retries, payloads);
   }
 
   public TxEvent(
@@ -100,9 +109,11 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod,
-        timeout, "", 0, payloads);
+        timeout, retryMethod, retries, payloads);
   }
 
   TxEvent(Long surrogateId,
@@ -115,10 +126,14 @@ public class TxEvent {
       String type,
       String compensationMethod,
       int timeout,
+      String retryMethod,
+      int retries,
       byte[] payloads) {
     this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
         compensationMethod,
         timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+        retryMethod,
+        retries,
         payloads);
   }
 
@@ -132,7 +147,7 @@ public class TxEvent {
       String type,
       String compensationMethod,
       Date expiryTime,
-      String retriesMethod,
+      String retryMethod,
       int retries,
       byte[] payloads) {
     this.surrogateId = surrogateId;
@@ -145,7 +160,7 @@ public class TxEvent {
     this.type = type;
     this.compensationMethod = compensationMethod;
     this.expiryTime = expiryTime;
-    this.retriesMethod = retriesMethod;
+    this.retryMethod = retryMethod;
     this.retries = retries;
     this.payloads = payloads;
   }
@@ -194,6 +209,14 @@ public class TxEvent {
     return expiryTime;
   }
 
+  public String retryMethod() {
+    return retryMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
@@ -206,19 +229,9 @@ public class TxEvent {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", expiryTime='" + expiryTime + '\'' +
+        ", expiryTime=" + expiryTime +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         '}';
   }
-
-  public int retries() {
-    return retries;
-  }
-
-  public String retriesMethod() {
-    return retriesMethod;
-  }
-
-  public boolean containChildren(TxEvent event) {
-    return this.localTxId.equals(event.parentTxId);
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 9eceadd..c481226 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -97,7 +97,7 @@ public interface TxEventRepository {
    * @param id
    * @return
    */
-  Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id);
+  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
   /**
    * Find a {@link TxEvent} which satisfies below requirements:
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index da36066..e80d5a8 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -73,8 +73,8 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
-      return Optional.empty();
+    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+      return emptyList();
     }
 
     @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 737fd11..53110bf 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -34,9 +34,22 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
   @Transactional
   @Modifying(clearAutomatically = true)
   @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+      + "SET c.status = :toStatus "
+      + "WHERE c.globalTxId = :globalTxId "
+      + "  AND c.localTxId = :localTxId "
+      + "  AND c.status = :fromStatus")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("fromStatus") String fromStatus,
+      @Param("toStatus") String toStatus,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
       + "SET c.status = :status "
       + "WHERE c.globalTxId = :globalTxId "
-      + "AND c.localTxId = :localTxId")
+      + "  AND c.localTxId = :localTxId")
   void updateStatusByGlobalTxIdAndLocalTxId(
       @Param("status") String status,
       @Param("globalTxId") String globalTxId,
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a95281..a54fa66 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -42,7 +42,7 @@ class GrpcOmegaCallback implements OmegaCallback {
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
-        .setCompensateMethod(event.compensationMethod())
+        .setCompensationMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
     observer.onNext(command);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 99457a2..a3137b4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -85,7 +85,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getType(),
         message.getCompensationMethod(),
         message.getTimeout(),
-        message.getRetriesMethod(),
+        message.getRetryMethod(),
         message.getRetries(),
         message.getPayloads().toByteArray()
     ));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index ed7b4f1..4aa30d1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SpringCommandRepository implements CommandRepository {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
@@ -57,13 +57,13 @@ public class SpringCommandRepository implements CommandRepository {
     }
 
     for (Command command : commands.values()) {
-      log.info("Saving compensation command {}", command);
+      LOG.info("Saving compensation command {}", command);
       try {
         commandRepository.save(command);
       } catch (Exception e) {
-        log.warn("Failed to save some command {}", command);
+        LOG.warn("Failed to save some command {}", command);
       }
-      log.info("Saved compensation command {}", command);
+      LOG.info("Saved compensation command {}", command);
     }
   }
 
@@ -85,24 +85,11 @@ public class SpringCommandRepository implements CommandRepository {
 
     commands.forEach(command ->
         commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+            NEW.name(),
             PENDING.name(),
             command.globalTxId(),
             command.localTxId()));
 
     return commands;
   }
-
-  private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
-    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
-        .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
-            && Objects.equals(c.localTxId(), localTxId)).count();
-  }
-
-  private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
-    return Collections.singletonList(new TxEvent(
-        abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
-        txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
-        txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
-    ));
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index cae6456..e48a780 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,17 +17,14 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
+
 import java.util.List;
 import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.springframework.data.domain.PageRequest;
-import org.springframework.util.CollectionUtils;
-
-import javax.swing.text.html.Option;
-
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 
 class SpringTxEventRepository implements TxEventRepository {
   private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1);
@@ -63,12 +60,8 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
-    List<TxEvent> result = eventRepo.findFirstUncompensatedEventByIdGreaterThan(id, SINGLE_TX_EVENT_REQUEST);
-    if (CollectionUtils.isEmpty(result)) {
-      return Optional.empty();
-    }
-    return Optional.of(result.get(0));
+  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST);
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 7195139..53b4443 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.data.domain.PageRequest;
 
 public class SpringTxTimeoutRepository implements TxTimeoutRepository {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final TxTimeoutEntityRepository timeoutRepo;
 
   SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
@@ -43,7 +44,7 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository {
     try {
       timeoutRepo.save(timeout);
     } catch (Exception ignored) {
-      log.warn("Failed to save some timeout {}", timeout);
+      LOG.warn("Failed to save some timeout {}", timeout);
     }
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3a7edb3..68e6233 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -35,7 +35,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
       + "  SELECT t1.globalTxId FROM TxEvent t1"
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+      + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.surrogateId != t.surrogateId "
+      + "    AND t3.creationTime > t.creationTime) AND (("
+      + "SELECT MIN(t2.retries) FROM TxEvent t2 "
+      + "WHERE t2.globalTxId = t.globalTxId "
+      + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.type = 'TxStartedEvent') = 0 "
+      + "OR t.globalTxId = t.localTxId)")
   Optional<TxEvent> findFirstAbortedGlobalTxByType();
 
   @Query("SELECT t FROM TxEvent t "
@@ -56,9 +66,13 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
-      + "t.type, t.compensationMethod, t.payloads"
+      + "t.type, t.compensationMethod, t.payloads "
       + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = ?2")
+      + "WHERE t.globalTxId = ?1 AND t.type = ?2 "
+      + "  AND ( SELECT MIN(t1.retries) FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type = 'TxStartedEvent' ) = 0 ")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
   @Query("SELECT t FROM TxEvent t "
@@ -73,25 +87,30 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.compensationMethod != t.retriesMethod "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?1 AND EXISTS ( "
-      + "  SELECT t1.globalTxId"
-      + "  FROM TxEvent t1 "
+      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
-      + "  AND t1.type = 'TxAbortedEvent'"
-      + ") AND NOT EXISTS ( "
-      + "  SELECT t2.globalTxId"
-      + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = t.globalTxId "
-      + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent') "
+      + "    AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+      + "    SELECT t2.globalTxId FROM TxEvent t2 "
+      + "    WHERE t2.globalTxId = t1.globalTxId "
+      + "      AND t2.localTxId = t1.localTxId "
+      + "      AND t2.type = 'TxStartedEvent' "
+      + "      AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( "
+      + "  SELECT t3.globalTxId FROM TxEvent t3 "
+      + "  WHERE t3.globalTxId = t.globalTxId "
+      + "    AND t3.localTxId = t.localTxId "
+      + "    AND t3.type = 'TxCompensatedEvent') AND ( "
+      + "  SELECT MIN(t4.retries) FROM TxEvent t4 "
+      + "  WHERE t4.globalTxId = t.globalTxId "
+      + "    AND t4.localTxId = t.localTxId "
+      + "    AND t4.type = 'TxStartedEvent' ) = 0 "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long surrogateId, Pageable pageable);
+  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
 
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index c21e518..3806f6d 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -26,8 +26,6 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
-  retries_method varchar(256) NOT NULL,
-  retries int NOT NULL,
   payloads varbinary(10240),
   PRIMARY KEY (surrogateId),
   INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime)
@@ -43,6 +41,8 @@ CREATE TABLE IF NOT EXISTS Command (
   localTxId varchar(36) NOT NULL,
   parentTxId varchar(36) DEFAULT NULL,
   compensationMethod varchar(256) NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads varbinary(10240),
   status varchar(12),
   lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS Command (
   INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, status)
 ) DEFAULT CHARSET=utf8;
 
+
 CREATE TABLE IF NOT EXISTS TxTimeout (
   surrogateId bigint NOT NULL AUTO_INCREMENT,
   eventId bigint NOT NULL UNIQUE,
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 674e051..4ecb1b4 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime timestamp(6) NOT NULL,
-  retriesMethod varchar(256) NOT NULL,
-  retries int NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int NOT NULL DEFAULT 0,
   payloads bytea
 );
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 26aa17a..10bb2dd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -92,6 +92,8 @@ public class AlphaIntegrationTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String compensationMethod = getClass().getCanonicalName();
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String serviceName = uniquify("serviceName");
   private final String instanceId = uniquify("instanceId");
 
@@ -128,7 +130,9 @@ public class AlphaIntegrationTest {
   private TxConsistentService consistentService;
 
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
-  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
+
+  private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver(
+      this::onCompensation);
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -205,7 +209,7 @@ public class AlphaIntegrationTest {
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
+    CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
     TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
 
     await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -247,7 +251,7 @@ public class AlphaIntegrationTest {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -269,9 +273,9 @@ public class AlphaIntegrationTest {
 
     assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+            .setCompensationMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
+            .setCompensationMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
     ));
   }
 
@@ -289,7 +293,7 @@ public class AlphaIntegrationTest {
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getLocalTxId(), is(localTxId));
     assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getCompensationMethod(), is(compensationMethod));
     assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
@@ -301,7 +305,7 @@ public class AlphaIntegrationTest {
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
-    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensateStreamObserver());
+    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
 
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -403,7 +407,7 @@ public class AlphaIntegrationTest {
   @Test
   public void abortTimeoutTxStartedEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
 
     await().atMost(2, SECONDS).until(() -> {
@@ -429,6 +433,26 @@ public class AlphaIntegrationTest {
     });
   }
 
+  @Test
+  public void doNotCompensateRetryingEvents() throws InterruptedException {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.size(), is(4));
+    assertThat(events.get(0).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+    assertThat(events.get(2).type(), is(TxStartedEvent.name()));
+    assertThat(events.get(3).type(), is(TxEndedEvent.name()));
+
+    assertThat(receivedCommands.isEmpty(), is(true));
+  }
+
   private boolean waitTillTimeoutDone() {
     for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) {
       if (txTimeout.status().equals(DONE.name())) {
@@ -438,62 +462,13 @@ public class AlphaIntegrationTest {
     return false;
   }
 
-  @Test
-  public void retiesAndCompensateOnFailure() throws Exception {
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-
-    String localTxId1 = UUID.randomUUID().toString();
-
-    blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
-        .setServiceName(serviceName)
-        .setInstanceId(instanceId)
-        .setTimestamp(System.currentTimeMillis())
-        .setGlobalTxId(globalTxId)
-        .setLocalTxId(localTxId1)
-        .setParentTxId(parentTxId)
-        .setType(TxStartedEvent.name())
-        .setCompensationMethod("Compensation Method")
-        .setPayloads(ByteString.copyFrom(payload.getBytes()))
-        .setRetries(3).setRetriesMethod("Retries Method")
-        .build());
-    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
-
-    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
-
-    for (int i = 0; i < 3; i++) {
-      blockingStub.onTxEvent(
-          eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
-      await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
-      GrpcCompensateCommand command = receivedCommands.poll();
-      assertThat(command.getGlobalTxId(), is(globalTxId));
-      assertThat(command.getLocalTxId(), is(localTxId1));
-      assertThat(command.getParentTxId(), is(parentTxId));
-      assertThat(command.getCompensateMethod(), is("Retries Method"));
-      assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
-    }
-
-    blockingStub.onTxEvent(
-        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
-    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
-    GrpcCompensateCommand command = receivedCommands.poll();
-    assertThat(command.getGlobalTxId(), is(globalTxId));
-    assertThat(command.getLocalTxId(), is(localTxId1));
-    assertThat(command.getParentTxId(), is(parentTxId));
-    assertThat(command.getCompensateMethod(), is("Compensation Method"));
-    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
-  }
-
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
             command.getLocalTxId(),
             command.getParentTxId(),
-            new byte[0],
-            command.getCompensateMethod()));
+            command.getPayloads().toByteArray(),
+            command.getCompensationMethod()));
   }
 
   private GrpcServiceConfig someServiceConfig() {
@@ -516,23 +491,35 @@ public class AlphaIntegrationTest {
   }
 
   private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout);
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout,
+        "", 0);
+  }
+
+  private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0,
+        retryMethod, retries);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
-    return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+    return someGrpcEvent(type, localTxId);
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
+  private GrpcTxEvent someGrpcEvent(EventType type, String localTxId) {
     return someGrpcEvent(type, globalTxId, localTxId);
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0);
+    return someGrpcEvent(type, globalTxId, localTxId, parentTxId);
+  }
+
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "",
+        0);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0);
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
+      String compensationMethod) {
+    return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0);
   }
 
   private GrpcTxEvent eventOf(EventType eventType,
@@ -541,7 +528,9 @@ public class AlphaIntegrationTest {
       String parentTxId,
       byte[] payloads,
       String compensationMethod,
-      int timeout) {
+      int timeout,
+      String retryMethod,
+      int retries) {
 
     return GrpcTxEvent.newBuilder()
         .setServiceName(serviceName)
@@ -553,19 +542,21 @@ public class AlphaIntegrationTest {
         .setType(eventType.name())
         .setCompensationMethod(compensationMethod)
         .setTimeout(timeout)
+        .setRetryMethod(retryMethod)
+        .setRetries(retries)
         .setPayloads(ByteString.copyFrom(payloads))
         .build();
   }
 
-  private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+  private static class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> {
     private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
-    private CompensateStreamObserver() {
+    private CompensationStreamObserver() {
       this(command -> {});
     }
 
-    private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+    private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
       this.consumer = consumer;
     }
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index ca46625..8d70899 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   expiryTime TIMESTAMP NOT NULL,
-  retriesMethod varchar(256) NOT NULL,
-  retries int NOT NULL,
+  retryMethod varchar(256) NOT NULL,
+  retries int DEFAULT 0 NOT NULL,
   payloads varbinary(10240)
 );
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
similarity index 68%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
index 8c70e3a..ad8ae3a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.omega.transaction;
+package org.apache.servicecomb.saga.integration.pack.tests;
 
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.repository.CrudRepository;
 
-public class SagaEndedEvent extends TxEvent {
-
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
-  }
+interface CommandEnvelopeRepository extends CrudRepository<Command, Long> {
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 2bdd587..e497cec 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -77,4 +77,12 @@ public class GreetingController {
   ResponseEntity<String> goodNight(@RequestParam String name) {
     return ResponseEntity.ok("Good night, " + name);
   }
+
+  @SagaStart
+  @GetMapping("/open")
+  ResponseEntity<String> open(@RequestParam String name, @RequestParam int retries) {
+    String greetings = greetingService.greet(name);
+    String status = greetingService.open(name, retries);
+    return ResponseEntity.ok(greetings + "; " + status);
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index 69a86f6..554dc15 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -27,6 +27,9 @@ import org.springframework.stereotype.Service;
 class GreetingService {
   private final Queue<String> compensated;
 
+  private final int MAX_COUNT = 3;
+  private int failedCount = 1;
+
   @Autowired
   GreetingService(Queue<String> compensated) {
     this.compensated = compensated;
@@ -59,8 +62,27 @@ class GreetingService {
     return appendMessage("My bad, please take the window instead, " + name);
   }
 
+  @Compensable(retries = MAX_COUNT, compensationMethod = "close")
+  String open(String name, int retries) {
+    if (failedCount < retries) {
+      failedCount += 1;
+      throw new IllegalStateException("You know when the zoo opens, " + name);
+    }
+    resetCount();
+    return "Welcome to visit the zoo, " + name;
+  }
+
+  String close(String name, int retries) {
+    resetCount();
+    return appendMessage("Sorry, the zoo has already closed, " + name);
+  }
+
   private String appendMessage(String message) {
     compensated.add(message);
     return message;
   }
+
+  void resetCount() {
+    this.failedCount = 1;
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index b3045e3..9199ff6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -56,14 +56,23 @@ public class PackIT {
   private OmegaContext omegaContext;
 
   @Autowired
-  private TxEventEnvelopeRepository repository;
+  private TxEventEnvelopeRepository eventRepo;
+
+  @Autowired
+  private CommandEnvelopeRepository commandRepo;
 
   @Autowired
   private Queue<String> compensatedMessages;
 
+  @Autowired
+  private GreetingService greetingService;
+
   @After
   public void tearDown() throws Exception {
-    repository.deleteAll();
+    eventRepo.deleteAll();
+    commandRepo.deleteAll();
+    compensatedMessages.clear();
+    greetingService.resetCount();
   }
 
   @Test(timeout = 5000)
@@ -75,11 +84,11 @@ public class PackIT {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Greetings, mike; Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -136,13 +145,13 @@ public class PackIT {
 
     assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
 
-    await().atMost(2, SECONDS).until(() -> repository.count() == 7);
+    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 7);
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
     assertThat(events.size(), is(7));
 
     TxEvent sagaStartedEvent = events.get(0);
@@ -184,11 +193,11 @@ public class PackIT {
     assertThat(entity.getStatusCode(), is(OK));
     assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
 
-    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
-    List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
 
     assertThat(events.size(), is(6));
 
@@ -220,4 +229,69 @@ public class PackIT {
 
     assertThat(compensatedMessages.isEmpty(), is(true));
   }
+
+  @Test(timeout = 5000)
+  public void retrySubTransactionSuccess() {
+    ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        "eric",
+        2);
+
+    assertThat(entity.getStatusCode(), is(OK));
+    assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the zoo, eric"));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 8);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(8));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxEndedEvent"));
+    assertThat(events.get(7).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
+
+  @Test(timeout = 5000)
+  public void compensateWhenRetryReachesMaximum() throws InterruptedException {
+    // retries 3 times and then compensate
+    ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+        String.class,
+        TRESPASSER,
+        5);
+
+    assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 11);
+
+    List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(events.size(), is(11));
+
+    assertThat(events.get(0).type(), is("SagaStartedEvent"));
+    assertThat(events.get(1).type(), is("TxStartedEvent"));
+    assertThat(events.get(2).type(), is("TxEndedEvent"));
+    assertThat(events.get(3).type(), is("TxStartedEvent"));
+    assertThat(events.get(4).type(), is("TxAbortedEvent"));
+    assertThat(events.get(5).type(), is("TxStartedEvent"));
+    assertThat(events.get(6).type(), is("TxAbortedEvent"));
+    assertThat(events.get(7).type(), is("TxStartedEvent"));
+    assertThat(events.get(8).type(), is("TxAbortedEvent"));
+    assertThat(events.get(9).type(), is("TxCompensatedEvent"));
+    assertThat(events.get(10).type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages, contains("Goodbye, " + TRESPASSER));
+  }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index cf53a0c..b33eeb3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,13 +42,13 @@ import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
 
 public class GrpcClientMessageSender implements MessageSender {
-
   private final String target;
   private final TxEventServiceStub asyncEventService;
 
   private final MessageSerializer serializer;
 
   private final TxEventServiceBlockingStub blockingEventService;
+
   private final GrpcCompensateStreamObserver compensateStreamObserver;
   private final GrpcServiceConfig serviceConfig;
 
@@ -104,8 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setCompensationMethod(event.compensationMethod())
+        .setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
         .setRetries(event.retries())
-        .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 3cf46f8..9d9c312 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -46,14 +46,14 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
 
   @Override
   public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
 
     messageHandler.onReceive(
         command.getGlobalTxId(),
         command.getLocalTxId(),
         command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensateMethod(),
+        command.getCompensationMethod(),
         deserializer.deserialize(command.getPayloads().toByteArray()));
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9a78a62..afff8e7 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -48,7 +48,7 @@ import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
@@ -104,7 +104,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       try {
         sender.onConnected();
       } catch (Exception e) {
-        log.error("Failed connecting to alpha at {}", sender.target(), e);
+        LOG.error("Failed connecting to alpha at {}", sender.target(), e);
       }
     });
   }
@@ -115,7 +115,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       try {
         sender.onDisconnected();
       } catch (Exception e) {
-        log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+        LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
     });
   }
@@ -140,7 +140,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       } catch (OmegaException e) {
         throw e;
       } catch (Exception e) {
-        log.error("Retry sending event {} due to failure", event, e);
+        LOG.error("Retry sending event {} due to failure", event, e);
 
         // very large latency on exception
         senders.put(messageSender, Long.MAX_VALUE);
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index f019d10..02571fd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class PushBackReconnectRunnable implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final MessageSender messageSender;
   private final Map<MessageSender, Long> senders;
   private final BlockingQueue<Runnable> pendingTasks;
@@ -47,14 +47,14 @@ class PushBackReconnectRunnable implements Runnable {
   @Override
   public void run() {
     try {
-      log.info("Retry connecting to alpha at {}", messageSender.target());
+      LOG.info("Retry connecting to alpha at {}", messageSender.target());
       messageSender.onDisconnected();
       messageSender.onConnected();
       senders.put(messageSender, 0L);
       connectedSenders.offer(messageSender);
-      log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+      LOG.info("Retry connecting to alpha at {} is successful", messageSender.target());
     } catch (Exception e) {
-      log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+      LOG.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
       pendingTasks.offer(this);
     }
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index d66b737..e8b2f34 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -90,15 +90,15 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final List<String> compensated = new ArrayList<>();
 
-  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) ->
-      compensated.add(globalTxId);
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod,
+      payloads) -> compensated.add(globalTxId);
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
   private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
-      compensationMethod, 0, null, 0, "blah");
+      compensationMethod, 0, "", 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -335,6 +335,7 @@ public class LoadBalancedClusterMessageSenderTest {
     private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
+
     private StreamObserver<GrpcCompensateCommand> responseObserver;
 
     private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
@@ -357,9 +358,9 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getLocalTxId(),
           request.getParentTxId(),
           request.getCompensationMethod(),
-          0,
-          null,
-          0,
+          request.getTimeout(),
+          request.getRetryMethod(),
+          request.getRetries(),
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 9d0ebc4..3856415 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,6 +42,7 @@ public class RetryableMessageSenderTest {
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
+
   private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
 
   @Test
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 48a67f7..cf16888 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -36,7 +36,7 @@ public class CompensationContext {
     contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
   }
 
-  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
+  public void apply(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
     CompensationContextInternal contextInternal = contexts.get(compensationMethod);
 
     try {
@@ -44,7 +44,7 @@ public class CompensationContext {
       LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
     } catch (IllegalAccessException | InvocationTargetException e) {
       LOG.error(
-          "Pre-checking for compensate method " + contextInternal.compensationMethod.toString()
+          "Pre-checking for compensation method " + contextInternal.compensationMethod.toString()
               + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
           e);
     }
@@ -52,6 +52,7 @@ public class CompensationContext {
 
   private static final class CompensationContextInternal {
     private final Object target;
+
     private final Method compensationMethod;
 
     private CompensationContextInternal(Object target, Method compensationMethod) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 338751c..7d7d45f 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -26,6 +26,7 @@ import org.springframework.util.ReflectionUtils;
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
   private final OmegaContext omegaContext;
+
   private final CompensationContext compensationContext;
 
   CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 268fad9..90d8b06 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -31,6 +31,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
+
   private final CompensationContext compensationContext;
 
   CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
@@ -47,10 +48,13 @@ class CompensableMethodCheckingCallback implements MethodCallback {
     String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
 
     try {
-      Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
       compensationContext.addCompensationContext(method, bean);
-      compensationContext.addCompensationContext(signature, bean);
-      LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+
+      if (!compensationMethod.isEmpty()) {
+        Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+        compensationContext.addCompensationContext(signature, bean);
+        LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+      }
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
           "No such compensation method [" + compensationMethod + "] found in " + bean.getClass().getCanonicalName(),
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 41be8f7..4fd4188 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -35,7 +35,7 @@ public class TransactionAspectConfig {
 
   @Bean
   MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
-    return new CompensationMessageHandler(sender, omegaContext, context);
+    return new CompensationMessageHandler(sender, context);
   }
 
   @Order(0)
@@ -51,7 +51,8 @@ public class TransactionAspectConfig {
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext,
+      CompensationContext compensationContext) {
     return new CompensableAnnotationProcessor(omegaContext, compensationContext);
   }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 19d0942..6505bfc 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -24,10 +24,13 @@ import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -106,15 +109,18 @@ public class TransactionInterceptionTest {
 
   private String compensationMethod;
 
-  private String retriesMethod;
+  private String compensationMethod2;
+
+  private String retryMethod;
 
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
-    retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
+    retryMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class, int.class).toString();
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
+    compensationMethod2 = TransactionalUserService.class.getDeclaredMethod("delete", User.class, int.class).toString();
   }
 
   @After
@@ -122,6 +128,7 @@ public class TransactionInterceptionTest {
     messages.clear();
     userRepository.deleteAll();
     omegaContext.clear();
+    userService.resetCount();
   }
 
   @AfterClass
@@ -133,8 +140,9 @@ public class TransactionInterceptionTest {
     User user = userService.add(this.user);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+                user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -154,8 +162,9 @@ public class TransactionInterceptionTest {
     }
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+                illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -176,10 +185,11 @@ public class TransactionInterceptionTest {
     assertThat(userRepository.findByUsername(anotherUser.username()), is(nullValue()));
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0,
+                anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -189,6 +199,60 @@ public class TransactionInterceptionTest {
   }
 
   @Test
+  public void retryTillSuccess() {
+    try {
+      userService.add(user, 1);
+    } catch (Exception e) {
+      fail("unexpected exception throw: " + e);
+    }
+
+    assertThat(messages.size(), is(4));
+
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
+            .toString()));
+
+    String abortedEvent = messages.get(1);
+    assertThat(abortedEvent, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
+            .toString()));
+    assertThat(messages.get(3),
+        is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));
+
+    assertThat(userRepository.count(), is(1L));
+    userRepository.findAll().forEach(user -> assertThat(user, is(this.user)));
+  }
+
+  @Test
+  public void retryReachesMaximumThenThrowsException() {
+    try {
+      userService.add(user, 3);
+      expectFailing(IllegalStateException.class);
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage(), is("Retry harder"));
+    }
+
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 3)
+            .toString()));
+
+    String abortedEvent1 = messages.get(1);
+    assertThat(abortedEvent1, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(messages.get(2),
+        is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 3)
+            .toString()));
+
+    String abortedEvent2 = messages.get(3);
+    assertThat(abortedEvent2, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+    assertThat(userRepository.count(), is(0L));
+  }
+
+  @Test
   public void passesOmegaContextThroughDifferentThreads() throws Exception {
     new Thread(() -> userService.add(user)).start();
     waitTillSavedUser(username);
@@ -198,10 +262,10 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -217,10 +281,10 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -239,8 +303,8 @@ public class TransactionInterceptionTest {
     waitTillSavedUser(username);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+        new String[] {
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -258,11 +322,10 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
-
     actorSystem.terminate();
   }
 
@@ -298,7 +361,7 @@ public class TransactionInterceptionTest {
     private final List<String> messages = new ArrayList<>();
 
     @Bean
-    CompensationContext compensationContext() {
+    CompensationContext recoveryContext() {
       return new CompensationContext();
     }
 
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
index c98c6ea..0618109 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
@@ -17,21 +17,26 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
-
 @Component
 class TransactionalUserService {
   static final String ILLEGAL_USER = "Illegal User";
   private final UserRepository userRepository;
 
+  private int count = 0;
+
   @Autowired
   TransactionalUserService(UserRepository userRepository) {
     this.userRepository = userRepository;
   }
 
+  void resetCount() {
+    this.count = 0;
+  }
+
   @Compensable(compensationMethod = "delete")
   User add(User user) {
     if (ILLEGAL_USER.equals(user.username())) {
@@ -43,4 +48,19 @@ class TransactionalUserService {
   void delete(User user) {
     userRepository.delete(user);
   }
+
+  @Compensable(retries = 2, compensationMethod = "delete")
+  User add(User user, int count) {
+    if (this.count < count) {
+      this.count += 1;
+      throw new IllegalStateException("Retry harder");
+    }
+    resetCount();
+    return userRepository.save(user);
+  }
+
+  void delete(User user, int count) {
+    resetCount();
+    userRepository.delete(user);
+  }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
index c5c3d84..da9d4b2 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
@@ -62,7 +62,7 @@ public class User {
       return false;
     }
     User user = (User) o;
-    return id == user.id &&
+    return id.equals(user.id) &&
         Objects.equals(username, user.username) &&
         Objects.equals(email, user.email);
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 988d8d7..588d660 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
   private final OmegaContext context;
@@ -30,9 +29,10 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
-    return sender.send(new TxStartedEvent(
-        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
+    return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
+        timeout, retriesMethod, retries, message));
   }
 
   @Override
@@ -42,7 +42,7 @@ class CompensableInterceptor implements EventAwareInterceptor {
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
-        throwable));
+    sender.send(
+        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 15cf91a..fe2eea5 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -18,26 +18,21 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
-  private final OmegaContext omegaContext;
+
   private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) {
+  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
     this.sender = sender;
     this.context = context;
-    this.omegaContext = omegaContext;
   }
 
   @Override
-  public void onReceive(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, Object... payloads) {
-    String oldLocalTxId = omegaContext.localTxId();
-    omegaContext.setLocalTxId(parentTxId);
-    context.compensate(globalTxId, localTxId, compensationMethod, payloads);
+  public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      Object... payloads) {
+    context.apply(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
-    omegaContext.setLocalTxId(oldLocalTxId);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
index 86cc840..0844981 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,64 +25,59 @@ import javax.transaction.InvalidTransactionException;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Aspect
-public class TransactionAspect {
+/**
+ * DefaultRecovery is used to execute business logic once.
+ * The corresponding events will report to alpha server before and after the execution of business logic.
+ * If there are errors while executing the business logic, a TxAbortedEvent will be reported to alpha.
+ *
+ *                 pre                       post
+ *     request --------- 2.business logic --------- response
+ *                 \                          |
+ * 1.TxStartedEvent \                        | 3.TxEndedEvent
+ *                   \                      |
+ *                    ----------------------
+ *                            alpha
+ */
+public class DefaultRecovery implements RecoveryPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final OmegaContext context;
-
-  private final CompensableInterceptor interceptor;
-
-  public TransactionAspect(MessageSender sender, OmegaContext context) {
-    this.context = context;
-    this.interceptor = new CompensableInterceptor(context, sender);
-  }
-
-  @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
-  Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    Object[] args = joinPoint.getArgs();
-    int retries = compensable.retries();
-    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
-    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
+    String compensationSignature =
+        compensable.compensationMethod().isEmpty() ? "" : compensationMethodSignature(joinPoint, compensable, method);
 
-    String localTxId = context.localTxId();
-    context.newLocalTxId();
+    String retrySignature = (retries != 0 || compensationSignature.isEmpty()) ? method.toString() : "";
 
-    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
+    AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.timeout(),
+        retrySignature, retries, joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
-      context.setLocalTxId(localTxId);
+      context.setLocalTxId(parentTxId);
       throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
           " because global transaction " + context.globalTxId() + " has already aborted.");
     }
-    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, compensationSignature);
+      interceptor.postIntercept(parentTxId, compensationSignature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptor.onError(localTxId, compensationSignature, throwable);
+      interceptor.onError(parentTxId, compensationSignature, throwable);
       throw throwable;
-    } finally {
-      context.setLocalTxId(localTxId);
-      LOG.debug("Restored context back to {}", context);
     }
   }
 
-  private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
+  String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
       throws NoSuchMethodException {
-
     return joinPoint.getTarget()
         .getClass()
         .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index e20bea2..285d549 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+        int retries, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +34,8 @@ public interface EventAwareInterceptor {
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
new file mode 100644
index 0000000..d1a28c2
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ForwardRecovery is used to execute business logic with the given retries times.
+ * If retries is above 0, it will retry the given times at most.
+ * If retries == -1, it will retry forever until interrupted.
+ */
+public class ForwardRecovery extends DefaultRecovery {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // TODO: 2018/03/10 we do not support retry with timeout yet
+  @Override
+  public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    int remains = retries;
+    try {
+      while (true) {
+        try {
+          return super.apply(joinPoint, compensable, interceptor, context, parentTxId, remains);
+        } catch (Throwable throwable) {
+          if (throwable instanceof InvalidTransactionException) {
+            throw throwable;
+          }
+
+          remains = remains == -1 ? -1 : remains - 1;
+          if (remains == 0) {
+            LOG.error(
+                "Retried sub tx failed maximum times, global tx id: {}, local tx id: {}, method: {}, retried times: {}",
+                context.globalTxId(), context.localTxId(), method.toString(), retries);
+            throw throwable;
+          }
+
+          LOG.warn("Retrying sub tx failed, global tx id: {}, local tx id: {}, method: {}, remains: {}",
+              context.globalTxId(), context.localTxId(), method.toString(), remains);
+          Thread.sleep(compensable.retryDelayInMilliseconds());
+        }
+      }
+    } catch (InterruptedException e) {
+      String errorMessage = "Failed to handle tx because it is interrupted, global tx id: " + context.globalTxId()
+          + ", local tx id: " + context.localTxId() + ", method: " + method.toString();
+      LOG.error(errorMessage);
+      interceptor.onError(parentTxId, compensationMethodSignature(joinPoint, compensable, method), e);
+      throw new OmegaException(errorMessage);
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
similarity index 64%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
index 8e288df..bc1d4d8 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
 
-public class TxCompensatedEvent extends TxEvent {
-  public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
-  }
+public interface RecoveryPolicy {
+  Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+      OmegaContext context, String parentTxId, int retries) throws Throwable;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
index 8c70e3a..f59ac2b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,17 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.common.EventType;
+public class RecoveryPolicyFactory {
+  private static final RecoveryPolicy DEFAULT_RECOVERY = new DefaultRecovery();
 
-public class SagaEndedEvent extends TxEvent {
+  private static final RecoveryPolicy FORWARD_RECOVERY = new ForwardRecovery();
 
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+  /**
+   * If retries == 0, use the default recovery to execute only once.
+   * If retries > 0, it will use the forward recovery and retry the given times at most.
+   * If retries == -1, it will use the forward recovery and retry forever until interrupted.
+   */
+  static RecoveryPolicy getRecoveryPolicy(int retries) {
+    return retries != 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 8c70e3a..2e28b5e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -20,8 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import org.apache.servicecomb.saga.common.EventType;
 
 public class SagaEndedEvent extends TxEvent {
-
-  public SagaEndedEvent(String globalTxId, String localTxId) {
-    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+  SagaEndedEvent(String globalTxId, String localTxId) {
+    super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 49dd8e4..486f28c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,10 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+      int retries, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index a2ee58c..8722deb 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0);
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
     try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index cb76a26..0e87a97 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType;
 public class SagaStartedEvent extends TxEvent {
   public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
     // use "" instead of null as compensationMethod requires not null in sql
-    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout);
+    super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 86cc840..f7a98ee 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -20,8 +20,6 @@ package org.apache.servicecomb.saga.omega.transaction;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
@@ -47,45 +45,17 @@ public class TransactionAspect {
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
   Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-    LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
-
-    Object[] args = joinPoint.getArgs();
-    int retries = compensable.retries();
-    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
-    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
-
     String localTxId = context.localTxId();
     context.newLocalTxId();
-
-    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
-    if (response.aborted()) {
-      String abortedLocalTxId = context.localTxId();
-      context.setLocalTxId(localTxId);
-      throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
-          " because global transaction " + context.globalTxId() + " has already aborted.");
-    }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
+    int retries = compensable.retries();
+    RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
     try {
-      Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, compensationSignature);
-
-      return result;
-    } catch (Throwable throwable) {
-      interceptor.onError(localTxId, compensationSignature, throwable);
-      throw throwable;
+      return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
     } finally {
       context.setLocalTxId(localTxId);
       LOG.debug("Restored context back to {}", context);
     }
   }
-
-  private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
-      throws NoSuchMethodException {
-
-    return joinPoint.getTarget()
-        .getClass()
-        .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
-        .toString();
-  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index d6aa533..f0bac54 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -24,7 +24,8 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxAbortedEvent extends TxEvent {
   public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable));
+    super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
+        stackTrace(throwable));
   }
 
   private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index 8e288df..cd709e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxCompensatedEvent extends TxEvent {
   public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+    super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 8d6666a..f702c43 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxEndedEvent extends TxEvent {
   public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+    super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index a5b5514..a158af1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,21 +31,22 @@ public class TxEvent {
   private final String compensationMethod;
   private final int timeout;
   private final Object[] payloads;
-  private final String retriesMethod;
+
+  private final String retryMethod;
   private final int retries;
 
   public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
-      int timeout, String retriesMethod, int retries, Object... payloads) {
+      int timeout, String retryMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
+    this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.compensationMethod = compensationMethod;
-    this.payloads = payloads;
-    this.globalTxId = globalTxId;
     this.timeout = timeout;
-    this.retriesMethod = retriesMethod;
+    this.retryMethod = retryMethod;
     this.retries = retries;
+    this.payloads = payloads;
   }
 
   public long timestamp() {
@@ -80,8 +81,8 @@ public class TxEvent {
     return timeout;
   }
 
-  public String retriesMethod() {
-    return retriesMethod;
+  public String retryMethod() {
+    return retryMethod;
   }
 
   public int retries() {
@@ -95,7 +96,9 @@ public class TxEvent {
         ", localTxId='" + localTxId + '\'' +
         ", parentTxId='" + parentTxId + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", timeout='" + timeout + '\'' +
+        ", timeout=" + timeout +
+        ", retryMethod='" + retryMethod + '\'' +
+        ", retries=" + retries +
         ", payloads=" + Arrays.toString(payloads) +
         '}';
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index cb2580e..5d2ae12 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,8 +21,9 @@ import org.apache.servicecomb.saga.common.EventType;
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      int timeout, String retryMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retryMethod,
+        retries, payloads);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index e9bf6a7..78c4b91 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -50,7 +50,9 @@ public @interface Compensable {
    *
    * @return
    */
-  String compensationMethod();
+  String compensationMethod() default "";
+
+  int retryDelayInMilliseconds() default 0;
 
   /**
    * <code>@Compensable</code> method timeout, in seconds. <br>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 76ffc03..1e01ea5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -47,7 +48,8 @@ public class CompensableInterceptorTest {
   };
 
   private final String message = uniquify("message");
-  private final String retriesMethod = uniquify("retries");
+
+  private final String retryMethod = uniquify("retryMethod");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -64,7 +66,7 @@ public class CompensableInterceptorTest {
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
     int retries = new Random().nextInt();
-    interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, retries, message);
 
     TxEvent event = messages.get(0);
 
@@ -72,7 +74,7 @@ public class CompensableInterceptorTest {
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
     assertThat(event.retries(), is(retries));
-    assertThat(event.retriesMethod(), is(retriesMethod));
+    assertThat(event.retryMethod(), is(retryMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index c9c7394..d5d5de5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -42,15 +42,21 @@ public class CompensationMessageHandlerTest {
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
   private final String parentTxId = uniquify("parentTxId");
+
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final OmegaContext omegaContext = mock(OmegaContext.class);
   private final CompensationContext context = mock(CompensationContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context);
+
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+
+  @Before
+  public void setUp() {
+    events.clear();
+  }
 
   @Test
-  public void sendsEventOnCompensationCompleted() throws Exception {
+  public void sendsCompensatedEventOnCompensationCompleted() {
     handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
 
     assertThat(events.size(), is(1));
@@ -63,6 +69,6 @@ public class CompensationMessageHandlerTest {
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
 
-    verify(context).compensate(globalTxId, localTxId, compensationMethod, payload);
+    verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
similarity index 55%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
index 31d148f..75062bc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import javax.transaction.InvalidTransactionException;
@@ -40,28 +42,39 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-public class TransactionAspectTest {
+public class DefaultRecoveryTest {
   private final List<TxEvent> messages = new ArrayList<>();
+
   private final String globalTxId = UUID.randomUUID().toString();
+
   private final String localTxId = UUID.randomUUID().toString();
 
+  private final String parentTxId = UUID.randomUUID().toString();
+
   private final String newLocalTxId = UUID.randomUUID().toString();
 
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
   private final MessageSender sender = e -> {
     messages.add(e);
     return new AlphaResponse(false);
   };
-  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
-  @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
-  private final Compensable compensable = mock(Compensable.class);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
-  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+  private final RecoveryPolicy recoveryPolicy = new DefaultRecovery();
 
   @Before
   public void setUp() throws Exception {
@@ -71,75 +84,99 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
   }
 
   @Test
-  public void newLocalTxIdInCompensable() throws Throwable {
-    aspect.advise(joinPoint, compensable);
+  public void recordEndedEventWhenSuccess() throws Throwable {
+    when(joinPoint.proceed()).thenReturn(null);
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
 
-    TxEvent startedEvent = messages.get(0);
+    assertThat(messages.size(), is(2));
 
+    TxEvent startedEvent = messages.get(0);
     assertThat(startedEvent.globalTxId(), is(globalTxId));
-    assertThat(startedEvent.localTxId(), is(newLocalTxId));
-    assertThat(startedEvent.parentTxId(), is(localTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
 
     TxEvent endedEvent = messages.get(1);
-
     assertThat(endedEvent.globalTxId(), is(globalTxId));
-    assertThat(endedEvent.localTxId(), is(newLocalTxId));
-    assertThat(endedEvent.parentTxId(), is(localTxId));
+    assertThat(endedEvent.localTxId(), is(localTxId));
+    assertThat(endedEvent.parentTxId(), is(parentTxId));
     assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
-
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
   @Test
-  public void restoreContextOnCompensableError() throws Throwable {
-    RuntimeException oops = new RuntimeException("oops");
-
+  public void recordAbortedEventWhenFailed() throws Throwable {
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
-      assertThat(e, is(oops));
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
+    assertThat(messages.size(), is(2));
 
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    TxEvent startedEvent = messages.get(0);
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod(), is(""));
+
+    TxEvent abortedEvent = messages.get(1);
+    assertThat(abortedEvent.globalTxId(), is(globalTxId));
+    assertThat(abortedEvent.localTxId(), is(localTxId));
+    assertThat(abortedEvent.parentTxId(), is(parentTxId));
+    assertThat(abortedEvent.type(), is(EventType.TxAbortedEvent));
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
+  public void returnImmediatelyWhenReceivedRejectResponse() {
     MessageSender sender = mock(MessageSender.class);
     when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+    CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
+
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
       expectFailing(InvalidTransactionException.class);
     } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
       assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
     }
 
     verify(sender, times(1)).send(any());
   }
 
+  @Test
+  public void recordRetryMethodWhenRetriesIsSet() throws Throwable {
+    int retries = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
+    when(compensable.retries()).thenReturn(retries);
+
+    recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, retries);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(localTxId));
+    assertThat(startedEvent.parentTxId(), is(parentTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(retries));
+    assertThat(startedEvent.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+  }
+
   private String doNothing() {
     return "doNothing";
   }
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
similarity index 61%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
index 31d148f..76fe55a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -40,28 +41,41 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-public class TransactionAspectTest {
+public class ForwardRecoveryTest {
   private final List<TxEvent> messages = new ArrayList<>();
+
   private final String globalTxId = UUID.randomUUID().toString();
+
   private final String localTxId = UUID.randomUUID().toString();
 
+  private final String parentTxId = UUID.randomUUID().toString();
+
   private final String newLocalTxId = UUID.randomUUID().toString();
 
+  private final RuntimeException oops = new RuntimeException("oops");
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+  private final Compensable compensable = mock(Compensable.class);
+
   private final MessageSender sender = e -> {
     messages.add(e);
     return new AlphaResponse(false);
   };
-  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
-  @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
-  private final Compensable compensable = mock(Compensable.class);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
-  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+  private final RecoveryPolicy recoveryPolicy = new ForwardRecovery();
+
+  private volatile OmegaException exception;
 
   @Before
   public void setUp() throws Exception {
@@ -71,75 +85,75 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
   }
 
   @Test
-  public void newLocalTxIdInCompensable() throws Throwable {
-    aspect.advise(joinPoint, compensable);
-
-    TxEvent startedEvent = messages.get(0);
-
-    assertThat(startedEvent.globalTxId(), is(globalTxId));
-    assertThat(startedEvent.localTxId(), is(newLocalTxId));
-    assertThat(startedEvent.parentTxId(), is(localTxId));
-    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+  public void forwardExceptionWhenGlobalTxAborted() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
-    TxEvent endedEvent = messages.get(1);
+    CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
 
-    assertThat(endedEvent.globalTxId(), is(globalTxId));
-    assertThat(endedEvent.localTxId(), is(newLocalTxId));
-    assertThat(endedEvent.parentTxId(), is(localTxId));
-    assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+    try {
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
+      expectFailing(InvalidTransactionException.class);
+    } catch (InvalidTransactionException e) {
+      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+    } catch (Throwable throwable) {
+      fail("unexpected exception throw: " + throwable);
+    }
 
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    verify(sender, times(1)).send(any());
   }
 
   @Test
-  public void restoreContextOnCompensableError() throws Throwable {
-    RuntimeException oops = new RuntimeException("oops");
-
+  public void throwExceptionWhenRetryReachesMaximum() throws Throwable {
+    when(compensable.retries()).thenReturn(2);
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint, compensable);
+      recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 2);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
-      assertThat(e, is(oops));
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    TxEvent event = messages.get(1);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(newLocalTxId));
-    assertThat(event.parentTxId(), is(localTxId));
-    assertThat(event.type(), is(EventType.TxAbortedEvent));
-
-    assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(messages.size(), is(4));
+    assertThat(messages.get(0).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+    assertThat(messages.get(2).type(), is(EventType.TxStartedEvent));
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
-    MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(new AlphaResponse(true));
-
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
-    try {
-      aspect.advise(joinPoint, compensable);
-      expectFailing(InvalidTransactionException.class);
-    } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
-      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
-    }
+  public void keepRetryingTillInterrupted() throws Throwable {
+    when(compensable.retries()).thenReturn(-1);
+    when(compensable.retryDelayInMilliseconds()).thenReturn(1000);
+    when(joinPoint.proceed()).thenThrow(oops);
 
-    verify(sender, times(1)).send(any());
+    Thread thread = new Thread(() -> {
+      try {
+        recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, -1);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        exception = e;
+      } catch (Throwable throwable) {
+        fail("unexpected exception throw: " + throwable);
+      }
+    });
+    thread.start();
+
+    thread.interrupt();
+    thread.join();
+
+    assertThat(exception.getMessage().contains("Failed to handle tx because it is interrupted"), is(true));
   }
 
   private String doNothing() {
     return "doNothing";
   }
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 31d148f..0aa9549 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -20,18 +20,13 @@ package org.apache.servicecomb.saga.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import javax.transaction.InvalidTransactionException;
-
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -40,13 +35,11 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-
   private final String newLocalTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = e -> {
@@ -71,6 +64,7 @@ public class TransactionAspectTest {
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     when(compensable.compensationMethod()).thenReturn("doNothing");
+    when(compensable.retries()).thenReturn(0);
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
@@ -86,6 +80,8 @@ public class TransactionAspectTest {
     assertThat(startedEvent.localTxId(), is(newLocalTxId));
     assertThat(startedEvent.parentTxId(), is(localTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod().isEmpty(), is(true));
 
     TxEvent endedEvent = messages.get(1);
 
@@ -123,20 +119,84 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
-    MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(new AlphaResponse(true));
+  public void retryReachesMaximumAndForwardException() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops);
+    when(compensable.retries()).thenReturn(3);
 
-    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
-      expectFailing(InvalidTransactionException.class);
-    } catch (InvalidTransactionException e) {
-      System.out.println(e.getMessage());
-      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e.getMessage(), is("oops"));
     }
 
-    verify(sender, times(1)).send(any());
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(3));
+    assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(2));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxAbortedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+  }
+
+  @Test
+  public void keepRetryingTillSuccess() throws Throwable {
+    RuntimeException oops = new RuntimeException("oops");
+    when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
+    when(compensable.retries()).thenReturn(-1);
+
+    aspect.advise(joinPoint, compensable);
+
+    assertThat(messages.size(), is(6));
+
+    TxEvent startedEvent1 = messages.get(0);
+    assertThat(startedEvent1.globalTxId(), is(globalTxId));
+    assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent1.parentTxId(), is(localTxId));
+    assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent1.retries(), is(-1));
+    assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+    assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent2 = messages.get(2);
+    assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent2.retries(), is(-1));
+
+    assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+    TxEvent startedEvent3 = messages.get(4);
+    assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent3.retries(), is(-1));
+
+    assertThat(messages.get(5).type(), is(EventType.TxEndedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
   private String doNothing() {
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9605a37..d2c6f77 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,9 +22,11 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
+  rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {
+  }
   rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
-  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
+  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
+  }
 }
 
 message GrpcServiceConfig {
@@ -48,13 +50,14 @@ message GrpcTxEvent {
   string instanceId = 9;
   int32 timeout = 10;
   int32 retries = 11;
-  string retriesMethod = 12;
+  string retryMethod = 12;
 }
 
 message GrpcCompensateCommand {
   string globalTxId = 1;
   string localTxId = 2;
   string parentTxId = 3;
-  string compensateMethod = 4;
+  string compensationMethod = 4;
   bytes payloads = 5;
-}
\ No newline at end of file
+}
+

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 01/07: SCB-224: alpha support retries

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit bd5ed773cf5fcb63c2b52f59de2189c2bdb01424
Author: zhang2014 <co...@gmail.com>
AuthorDate: Wed Jan 17 23:01:55 2018 +0800

    SCB-224: alpha support retries
---
 .../saga/alpha/core/CompositeOmegaCallback.java    | 21 +++++++++
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  2 +
 .../saga/alpha/core/PushBackOmegaCallback.java     | 10 ++++
 .../saga/alpha/core/TxConsistentService.java       | 55 ++++++++++++++++++++--
 .../servicecomb/saga/alpha/core/TxEvent.java       | 13 +++++
 .../saga/alpha/server/GrpcOmegaCallback.java       | 12 +++++
 .../src/main/proto/GrpcTxEvent.proto               |  2 +
 7 files changed, 110 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
index 32e5102..bce8f4b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -29,6 +29,27 @@ public class CompositeOmegaCallback implements OmegaCallback {
   }
 
   @Override
+  public void retries(TxEvent event) {
+    Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
+
+    if (serviceCallbacks.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + event.serviceName());
+    }
+
+    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
+    if (omegaCallback == null) {
+      omegaCallback = serviceCallbacks.values().iterator().next();
+    }
+
+    try {
+      omegaCallback.compensate(event);
+    } catch (Exception e) {
+      serviceCallbacks.values().remove(omegaCallback);
+      throw e;
+    }
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index f60a44d..d926ed0 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
+  void retries(TxEvent event);
+
   void compensate(TxEvent event);
 
   default void disconnect() {
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c14..290eb20 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -35,6 +35,16 @@ public class PushBackOmegaCallback implements OmegaCallback {
   }
 
   @Override
+  public void retries(TxEvent event) {
+    try {
+      underlying.compensate(event);
+    } catch (Exception e) {
+      logError(event, e);
+      pendingCompensations.offer(() -> compensate(event));
+    }
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     try {
       underlying.compensate(event);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b7..a9a3ed2 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,17 +17,18 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+
 
 public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -61,6 +62,50 @@ public class TxConsistentService {
     return true;
   }
 
+//  private void compensate(TxEvent event) {
+//    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
+//
+//    Optional<TxEvent> startedEvent = events.stream().filter(e -> e.containChildren(event)).findFirst();
+//
+//    startedEvent.ifPresent(compensateEvent -> {
+//      Integer[] retiesAndTimes = eventsToRetries.compute(event.parentTxId(), (k, v) ->
+//          v == null ? new Integer[] {compensateEvent.retries(), 0} : new Integer[] {v[0], v[1] + 1});
+//      List<TxEvent> compensationEvents =
+//          retiesAndTimes[0] >= retiesAndTimes[1] ? events : Collections.singletonList(
+//              new TxEvent(
+//                  event.serviceName(), event.instanceId(), event.creationTime(), event.globalTxId(),
+//                  event.localTxId(), event.parentTxId(), event.type(), event.retriesMethod(), event.payloads()
+//              ));
+//
+//      compensateImpl(event.globalTxId(), compensationEvents);
+//    });
+//  }
+//
+//  private void compensateImpl(String globalTxId, List<TxEvent> events) {
+//    events.removeIf(this::isCompensationScheduled);
+//
+//    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(globalTxId, k -> new HashSet<>());
+//    events.forEach(e -> localTxIds.add(e.localTxId()));
+//
+//    events.forEach(omegaCallback::compensate);
+//  }
+
+  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
+  // unless we ask user to specify a name for each participant in the global TX in @Compensable
+//  private void updateCompensateStatus(TxEvent event) {
+//    Set<String> events = eventsToCompensate.get(event.globalTxId());
+//    if (events != null) {
+//      events.remove(event.localTxId());
+//      if (events.isEmpty()) {
+//        eventsToCompensate.remove(event.globalTxId());
+//        Integer[] retiesAndTimes = eventsToRetries.get(event.parentTxId());
+//        if (retiesAndTimes == null || retiesAndTimes[0] >= retiesAndTimes[1]) {
+//          markGlobalTxEnd(event);
+//          eventsToRetries.remove(event.parentTxId());
+//        }
+//      }
+//    }
+//  }
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 76dfca7..5176ce0 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -48,6 +48,8 @@ public class TxEvent {
   private String compensationMethod;
   private Date expiryTime;
   private byte[] payloads;
+  private int retries;
+  private String retriesMethod;
 
   private TxEvent() {
   }
@@ -207,4 +209,15 @@ public class TxEvent {
         ", expiryTime='" + expiryTime + '\'' +
         '}';
   }
+  public int retries() {
+    return retries;
+  }
+
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public boolean containChildren(TxEvent event) {
+    return this.localTxId.equals(event.parentTxId);
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a95281..83a6b9d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -37,6 +37,18 @@ class GrpcOmegaCallback implements OmegaCallback {
   }
 
   @Override
+  public void retries(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+        .setCompensateMethod(event.retriesMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+
+  @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 3944eee..9605a37 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -47,6 +47,8 @@ message GrpcTxEvent {
   string serviceName = 8;
   string instanceId = 9;
   int32 timeout = 10;
+  int32 retries = 11;
+  string retriesMethod = 12;
 }
 
 message GrpcCompensateCommand {

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.

[incubator-servicecomb-saga] 02/07: SCB-224: omega support retries

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0651b91725700b5c09d253d2374845acabf42b90
Author: zhang2014 <co...@gmail.com>
AuthorDate: Wed Jan 17 23:06:04 2018 +0800

    SCB-224: omega support retries
---
 .../connector/grpc/GrpcClientMessageSender.java    |  5 ++++-
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  8 +++++---
 .../connector/grpc/RetryableMessageSenderTest.java |  2 +-
 .../spring/CompensableMethodCheckingCallback.java  |  1 +
 .../spring/TransactionInterceptionTest.java        | 23 ++++++++++++----------
 .../omega/transaction/CompensableInterceptor.java  |  4 ++--
 .../omega/transaction/EventAwareInterceptor.java   |  4 ++--
 .../transaction/SagaStartAnnotationProcessor.java  |  4 ++--
 .../saga/omega/transaction/SagaStartAspect.java    |  2 +-
 .../omega/transaction/TimeAwareInterceptor.java    |  0
 .../saga/omega/transaction/TransactionAspect.java  | 11 +++++++----
 .../saga/omega/transaction/TxEvent.java            | 14 ++++++++++++-
 .../saga/omega/transaction/TxStartedEvent.java     |  4 ++--
 .../omega/transaction/annotations/Compensable.java |  4 +++-
 .../transaction/CompensableInterceptorTest.java    |  8 ++++++--
 .../SagaStartAnnotationProcessorTest.java          |  4 ++--
 .../transaction/TimeAwareInterceptorTest.java      |  0
 17 files changed, 64 insertions(+), 34 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 5712f57..cf53a0c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -65,7 +65,8 @@ public class GrpcClientMessageSender implements MessageSender {
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
+    this.compensateStreamObserver =
+        new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
@@ -103,6 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setCompensationMethod(event.compensationMethod())
+        .setRetries(event.retries())
+        .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 788cf96..d66b737 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -98,7 +98,7 @@ public class LoadBalancedClusterMessageSenderTest {
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
   private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
-      compensationMethod, 0, "blah");
+      compensationMethod, 0, null, 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -189,7 +189,7 @@ public class LoadBalancedClusterMessageSenderTest {
     await().atMost(3, SECONDS).until(() -> compensated.contains(globalTxId));
   }
 
-  @Test (timeout = 1000)
+  @Test(timeout = 1000)
   public void stopSendingOnInterruption() throws Exception {
     MessageSender underlying = Mockito.mock(MessageSender.class);
     doThrow(RuntimeException.class).when(underlying).send(event);
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -358,6 +358,8 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getParentTxId(),
           request.getCompensationMethod(),
           0,
+          null,
+          0,
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 95bda85..9d0ebc4 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@ public class RetryableMessageSenderTest {
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0);
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 6c0c333..268fad9 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -48,6 +48,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
 
     try {
       Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+      compensationContext.addCompensationContext(method, bean);
       compensationContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 2ec42a5..19d0942 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,11 +106,14 @@ public class TransactionInterceptionTest {
 
   private String compensationMethod;
 
+  private String retriesMethod;
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
+    retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
@@ -131,7 +134,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -152,7 +155,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -174,9 +177,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -196,9 +199,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -215,9 +218,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -237,7 +240,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,7 +258,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 53e5158..fcce034 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
     return sender.send(new TxStartedEvent(
-        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message));
+        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index bb2cca4..e20bea2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b7afcf5..49dd8e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 388f237..a2ee58c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout());
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
     try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..e69de29
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 932b990..86cc840 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,12 +49,15 @@ public class TransactionAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    String signature = compensationMethodSignature(joinPoint, compensable, method);
+    Object[] args = joinPoint.getArgs();
+    int retries = compensable.retries();
+    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
+    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
+    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
@@ -65,11 +68,11 @@ public class TransactionAspect {
 
     try {
       Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(localTxId, compensationSignature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(localTxId, compensationSignature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 34be420..a5b5514 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,9 +31,11 @@ public class TxEvent {
   private final String compensationMethod;
   private final int timeout;
   private final Object[] payloads;
+  private final String retriesMethod;
+  private final int retries;
 
   public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
-      int timeout, Object... payloads) {
+      int timeout, String retriesMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
     this.localTxId = localTxId;
@@ -42,6 +44,8 @@ public class TxEvent {
     this.payloads = payloads;
     this.globalTxId = globalTxId;
     this.timeout = timeout;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
   }
 
   public long timestamp() {
@@ -76,6 +80,14 @@ public class TxEvent {
     return timeout;
   }
 
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 4732d95..cb2580e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -22,7 +22,7 @@ import org.apache.servicecomb.saga.common.EventType;
 public class TxStartedEvent extends TxEvent {
 
   public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, int timeout, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads);
+      String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 11ba7c7..e9bf6a7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -36,8 +36,10 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
 
+  int retries() default 0;
+
   /**
-   * Compensation method name, should not be null.<br>
+   * Compensation method name.<br>
    * A compensation method should satisfy below requirements:
    * <ol>
    *   <li>has same parameter list as @Compensable method's</li>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0ef9d4d..76ffc03 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,13 +18,13 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -47,6 +47,7 @@ public class CompensableInterceptorTest {
   };
 
   private final String message = uniquify("message");
+  private final String retriesMethod = uniquify("retries");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -62,13 +63,16 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
+    int retries = new Random().nextInt();
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
 
     TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.retries(), is(retries));
+    assertThat(event.retriesMethod(), is(retriesMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index cc84fc5..8c496f6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+    sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+      sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
new file mode 100644
index 0000000..e69de29

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.