You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@servicecomb.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/29 08:08:00 UTC

[jira] [Commented] (SCB-224) [pack] retry sub-transaction on failure

    [ https://issues.apache.org/jira/browse/SCB-224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493208#comment-16493208 ] 

ASF GitHub Bot commented on SCB-224:
------------------------------------

WillemJiang closed pull request #117: SCB-224 support retry sub-transaction
URL: https://github.com/apache/incubator-servicecomb-saga/pull/117
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 49c17560..a58a7055 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -113,11 +113,11 @@ public String localTxId() {
     return localTxId;
   }
 
-  String parentTxId() {
+  public String parentTxId() {
     return parentTxId;
   }
 
-  String compensationMethod() {
+  public String compensationMethod() {
     return compensationMethod;
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 2bbea774..25288ed6 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@
 
 public interface CommandRepository {
 
-  void saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(TxEvent abortEvent);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 2d51a74f..c90b8df3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -18,10 +18,7 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import static org.apache.servicecomb.saga.common.EventType.*;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Date;
@@ -40,7 +37,7 @@
   private final OmegaCallback omegaCallback;
   private final int eventPollingInterval;
 
-  private long nextEndedEventId;
+  private long nextAbortedEventId;
   private long nextCompensatedEventId;
 
   public EventScanner(ScheduledExecutorService scheduler,
@@ -75,11 +72,11 @@ private void pollEvents() {
   }
 
   private void saveUncompensatedEventsToCommands() {
-    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+    eventRepository.findByTypeAndIdGreaterThan(nextAbortedEventId, TxAbortedEvent.name())
         .forEach(event -> {
-          log.info("Found uncompensated event {}", event);
-          nextEndedEventId = event.id();
-          commandRepository.saveCompensationCommands(event.globalTxId());
+          log.info("Found aborted event {}", event);
+          nextAbortedEventId = event.id();
+          commandRepository.saveCompensationCommands(event);
         });
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1364cb79..5d50ff6c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -39,6 +39,8 @@
   private String type;
   private String compensationMethod;
   private byte[] payloads;
+  private int retries;
+  private String retriesMethod;
 
   private TxEvent() {
   }
@@ -68,44 +70,75 @@ public TxEvent(
     this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
   }
 
-  public TxEvent(
-      String serviceName,
-      String instanceId,
-      Date creationTime,
-      String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String type,
-      String compensationMethod,
+  public TxEvent(String serviceName, String instanceId, Date creationTime, String globalTxId, String localTxId,
+      String parentTxId, String type, String compensationMethod, byte[] payloads) {
+    this(
+        -1L,
+        serviceName,
+        instanceId,
+        creationTime,
+        globalTxId,
+        localTxId,
+        parentTxId,
+        type,
+        compensationMethod,
+        0, "",
+        payloads);
+  }
+
+  public TxEvent(Long id, String serviceName, String instanceId, Date creationTime, String globalTxId, String localTxId,
+      String parentTxId, String type, String compensationMethod, byte[] payloads) {
+    this(
+        id,
+        serviceName,
+        instanceId,
+        creationTime,
+        globalTxId,
+        localTxId,
+        parentTxId,
+        type,
+        compensationMethod,
+        0, "",
+        payloads);
+  }
+
+  public TxEvent(long id, String serviceName, String instanceId, String globalTxId, String localTxId, String parentTxId,
+      String type, String compensationMethod, byte[] payloads) {
+    this(
+        id,
+        serviceName,
+        instanceId,
+        new Date(),
+        globalTxId,
+        localTxId,
+        parentTxId,
+        type,
+        compensationMethod,
+        0, "",
+        payloads);
+  }
+
+  TxEvent(Long surrogateId, String serviceName, String instanceId, Date creationTime, String globalTxId,
+      String localTxId, String parentTxId, String type, String compensationMethod, int retries, String retriesMethod,
       byte[] payloads) {
-    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+    this.surrogateId = surrogateId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.creationTime = creationTime;
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.type = type;
+    this.compensationMethod = compensationMethod;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
+    this.payloads = payloads;
   }
 
-  public TxEvent(
-      long id,
-      String serviceName,
-      String instanceId,
-      String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String type,
-      String compensationMethod,
-      byte[] payloads) {
-    this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
-  }
 
-  TxEvent(Long surrogateId,
-      String serviceName,
-      String instanceId,
-      Date creationTime,
-      String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String type,
-      String compensationMethod,
+  public TxEvent(String serviceName, String instanceId, Date creationTime, String globalTxId,
+      String localTxId, String parentTxId, String type, String compensationMethod, String retriesMethod, int retries,
       byte[] payloads) {
-
-    this.surrogateId = surrogateId;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.creationTime = creationTime;
@@ -114,6 +147,8 @@ public TxEvent(
     this.parentTxId = parentTxId;
     this.type = type;
     this.compensationMethod = compensationMethod;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
     this.payloads = payloads;
   }
 
@@ -171,4 +206,16 @@ public String toString() {
         ", compensationMethod='" + compensationMethod + '\'' +
         '}';
   }
+
+  public int retries() {
+    return retries;
+  }
+
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public boolean containChildren(TxEvent event) {
+    return this.localTxId.equals(event.parentTxId);
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b61aa069..a9b8d721 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -25,7 +25,7 @@
 
   List<TxEvent> findTransactions(String globalTxId, String type);
 
-  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
+  List<TxEvent> findByTypeAndIdGreaterThan(long id, String type);
 
   Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
 
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 231d5bff..c1c6f95e 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -56,7 +56,7 @@ public void save(TxEvent event) {
     }
 
     @Override
-    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+    public List<TxEvent> findByTypeAndIdGreaterThan(long id, String type) {
       return emptyList();
     }
 
@@ -111,7 +111,15 @@ public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
+    return new TxEvent(serviceName,
+        instanceId,
+        new Date(),
+        globalTxId,
+        localTxId,
+        parentTxId,
+        eventType.name(),
+        compensationMethod,
+        payloads);
   }
 
   private TxEvent eventOf(EventType eventType, String localTxId) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index eced7f9f..f3099363 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -84,6 +84,8 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObser
         message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
         message.getType(),
         message.getCompensationMethod(),
+        message.getRetriesMethod(),
+        message.getRetries(),
         message.getPayloads().toByteArray()
     ));
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index afbdaf5d..cc8ebb63 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -17,17 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
-
-import java.lang.invoke.MethodHandles;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.Transactional;
-
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -35,6 +24,15 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.data.domain.PageRequest;
 
+import javax.transaction.Transactional;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.*;
+
 public class SpringCommandRepository implements CommandRepository {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
@@ -48,25 +46,32 @@
   }
 
   @Override
-  public void saveCompensationCommands(String globalTxId) {
-    List<TxEvent> events = eventRepository
-        .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
-
-    Map<String, Command> commands = new LinkedHashMap<>();
-
-    for (TxEvent event : events) {
-      commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
-    }
-
-    for (Command command : commands.values()) {
-      log.info("Saving compensation command {}", command);
-      try {
-        commandRepository.save(command);
-      } catch (Exception e) {
-        log.warn("Failed to save some command {}", command);
+  public void saveCompensationCommands(TxEvent abortEvent) {
+    Optional<TxEvent> compensationStartedEvent =
+        eventRepository.findStartedEventWithLocalTxId(abortEvent.globalTxId(), abortEvent.parentTxId());
+
+    compensationStartedEvent.ifPresent(txEvent -> {
+      String retriesMethod = txEvent.retriesMethod();
+      long retried = retriedTimes(txEvent.globalTxId(), retriesMethod, txEvent.localTxId());
+
+      List<TxEvent> compensationEvents = createRetriesTxEvent(abortEvent.id(), txEvent);
+
+      if (txEvent.retries() < (retried + 1)) {
+        compensationEvents =
+            eventRepository.findStartedEventsWithMatchingEndedButNotCompensatedEvents(txEvent.globalTxId());
       }
-      log.info("Saved compensation command {}", command);
-    }
+
+      compensationEvents.stream().map(Command::new)
+          .forEach(command -> {
+            log.info("Saving compensation command {}", command);
+            try {
+              commandRepository.save(command);
+            } catch (Exception e) {
+              log.warn("Failed to save some command {}", command);
+            }
+            log.info("Saved compensation command {}", command);
+          });
+    });
   }
 
   @Override
@@ -93,4 +98,18 @@ public void markCommandAsDone(String globalTxId, String localTxId) {
 
     return commands;
   }
+
+  private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
+    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
+        .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
+            && Objects.equals(c.localTxId(), localTxId)).count();
+  }
+
+  private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
+    return Collections.singletonList(new TxEvent(
+        abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
+        txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
+        txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
+    ));
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index ad321482..714814ef 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -42,8 +42,8 @@ public void save(TxEvent event) {
   }
 
   @Override
-  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
-    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1));
+  public List<TxEvent> findByTypeAndIdGreaterThan(long id, String type) {
+    return eventRepo.findByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1));
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 2e52fef2..b50f2f9d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -49,24 +49,17 @@
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.compensationMethod != t.retriesMethod "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
+  @Query("SELECT t FROM TxEvent t WHERE t.type = 'TxStartedEvent' AND t.globalTxId = ?1 AND t.localTxId = ?2")
+  Optional<TxEvent> findStartedEventWithLocalTxId(String globalTxId,String localTxId);
+
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
-      + "  SELECT t1.globalTxId"
-      + "  FROM TxEvent t1 "
-      + "  WHERE t1.globalTxId = t.globalTxId "
-      + "  AND t1.type = 'TxAbortedEvent'"
-      + ") AND NOT EXISTS ( "
-      + "  SELECT t2.globalTxId"
-      + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = t.globalTxId "
-      + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent') "
-      + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
+      + " WHERE t.type = ?1 AND t.surrogateId > ?2 ORDER BY t.surrogateId ASC")
+  List<TxEvent> findByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
 
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index d6b51729..bfda71d6 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -8,6 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
+  retries int NOT NULL,
+  retriesMethod varchar(256) NOT NULL,
   payloads bytea
 );
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5dddc1de..52d24f84 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -155,7 +155,7 @@ public void persistsEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
-    await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
 
     assertThat(receivedCommands.isEmpty(), is(true));
 
@@ -175,7 +175,7 @@ public void persistsEvent() {
   public void closeStreamOnDisconnected() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
 
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(3, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     assertThat(
         omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
@@ -186,19 +186,19 @@ public void closeStreamOnDisconnected() {
         omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
         is(false));
 
-    await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
+    await().atMost(3, SECONDS).until(compensateResponseObserver::isCompleted);
   }
 
   @Test
   public void closeStreamOfDisconnectedClientOnly() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+    await().atMost(3, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
 
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
     CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
     TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
 
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+    await().atMost(3, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
 
     blockingStub.onDisconnected(serviceConfig);
 
@@ -219,19 +219,31 @@ public void removeCallbackOnClientDown() throws Exception {
 
     omegaCallbacks.get(serviceName).get(instanceId).disconnect();
 
-    consistentService.handle(someTxAbortEvent(serviceName, instanceId));
-
-    await().atMost(1, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
+    consistentService.handle(
+        new TxEvent(
+            serviceName,
+            instanceId,
+            new Date(),
+            globalTxId,
+            localTxId,
+            localTxId,
+            TxAbortedEvent.name(),
+            compensationMethod,
+            payload.getBytes())
+    );
+
+    await().atMost(3, SECONDS).until(() -> omegaCallbacks.get(serviceName).isEmpty());
   }
 
   @Test
   public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId, payload.getBytes(), getClass().getCanonicalName()));
 
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    await().atMost(3, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     GrpcCompensateCommand command = receivedCommands.poll();
     assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -254,8 +266,9 @@ public void doNotCompensateDuplicateTxOnFailure() {
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() > 1);
 
     assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -270,10 +283,11 @@ public void getCompensateCommandOnFailure() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
-    await().atMost(1, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId, payload.getBytes(), getClass().getCanonicalName()));
+    await().atMost(3, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     GrpcCompensateCommand command = receivedCommands.poll();
     assertThat(command.getGlobalTxId(), is(globalTxId));
@@ -296,10 +310,11 @@ public void compensateOnlyFailedGlobalTransaction() {
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 3);
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId, payload.getBytes(), getClass().getCanonicalName()));
+    await().atMost(3, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommands.size(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -314,9 +329,10 @@ public void doNotStartSubTxOnFailure() {
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId, payload.getBytes(), getClass().getCanonicalName()));
 
-    await().atMost(1, SECONDS).until(() -> receivedCommands.size() == 1);
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
@@ -340,10 +356,11 @@ public void compensateOnlyCompletedTransactions() {
     String anotherLocalTxId2 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 7);
 
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
-    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, anotherLocalTxId2, localTxId, payload.getBytes(), getClass().getCanonicalName()));
+    await().atMost(3, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommands.size(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
@@ -357,15 +374,65 @@ public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
 
     String anotherLocalTxId = UUID.randomUUID().toString();
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId));
-    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId));
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, anotherLocalTxId, localTxId, payload.getBytes(), getClass().getCanonicalName()));
 
     blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 8);
+    await().atMost(3, SECONDS).until(() -> eventRepo.count() == 8);
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
   }
 
+  @Test
+  public void retiesAndCompensateOnFailure() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    String localTxId1 = UUID.randomUUID().toString();
+
+    blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId1)
+        .setParentTxId(parentTxId)
+        .setType(TxStartedEvent.name())
+        .setCompensationMethod("Compensation Method")
+        .setPayloads(ByteString.copyFrom(payload.getBytes()))
+        .setRetries(3).setRetriesMethod("Retries Method")
+        .build());
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
+
+    await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
+
+    for (int i = 0; i < 3; i++) {
+      blockingStub.onTxEvent(
+          eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+      await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+      GrpcCompensateCommand command = receivedCommands.poll();
+      assertThat(command.getGlobalTxId(), is(globalTxId));
+      assertThat(command.getLocalTxId(), is(localTxId1));
+      assertThat(command.getParentTxId(), is(parentTxId));
+      assertThat(command.getCompensateMethod(), is("Retries Method"));
+      assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+    }
+
+    blockingStub.onTxEvent(
+        eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
+
+    await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
+
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId1));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is("Compensation Method"));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
@@ -407,7 +474,8 @@ private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String loca
     return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
+      String compensationMethod) {
     return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod);
   }
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 344fdda2..9adbb131 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -9,6 +9,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
   payloads varbinary(10240),
+  retries int NOT NULL,
+  retriesMethod varchar(256) NOT NULL
 --  version bigint NOT NULL
 );
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 4af0773f..be812b5c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -65,7 +65,8 @@ public GrpcClientMessageSender(
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
+    this.compensateStreamObserver =
+        new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
@@ -102,6 +103,8 @@ private GrpcTxEvent convertEvent(TxEvent event) {
         .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
         .setType(event.type().name())
         .setCompensationMethod(event.compensationMethod())
+        .setRetries(event.retries())
+        .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8062ae90..d8ca705e 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -299,7 +299,7 @@ public void stopSendingWhenClusterIsDown() throws Exception {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", null, 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 562c50f8..11c7294c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", null, 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
@@ -82,4 +82,4 @@ public void blowsUpWhenInterrupted() throws InterruptedException {
     thread.interrupt();
     thread.join();
   }
-}
\ No newline at end of file
+}
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 118b0330..1d3c02e3 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -58,4 +58,4 @@ private CompensationContextInternal(Object target, Method compensationMethod) {
       this.compensationMethod = compensationMethod;
     }
   }
-}
\ No newline at end of file
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 6c0c3331..268fad9c 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -48,6 +48,7 @@ public void doWith(Method method) throws IllegalArgumentException {
 
     try {
       Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+      compensationContext.addCompensationContext(method, bean);
       compensationContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 5358db5a..41be8f76 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -34,8 +34,8 @@
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, CompensationContext context) {
-    return new CompensationMessageHandler(sender, context);
+  MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
+    return new CompensationMessageHandler(sender, omegaContext, context);
   }
 
   @Order(0)
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 7daf9547..3143484a 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,11 +106,14 @@
 
   private String compensationMethod;
 
+  private String retriesMethod;
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
+    retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
@@ -130,10 +133,11 @@ public void sendsUserToRemote_AroundTransaction() throws Exception {
     User user = userService.add(this.user);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
-        toArray(messages)
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()
+        }, toArray(messages)
     );
 
     User actual = userRepository.findOne(user.id());
@@ -151,8 +155,9 @@ public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
     }
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(),
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -173,10 +178,12 @@ public void compensateOnTransactionException() throws Exception {
     assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(),
+            new TxStartedEvent(
+                globalTxId, anotherLocalTxId, localTxId, compensationMethod, retriesMethod, 0, anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -195,10 +202,12 @@ public void passesOmegaContextThroughDifferentThreads() throws Exception {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxStartedEvent(
+                globalTxId, anotherLocalTxId, localTxId, compensationMethod, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -214,10 +223,12 @@ public void passesOmegaContextInThreadPool() throws Exception {
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxStartedEvent(
+                globalTxId, anotherLocalTxId, localTxId, compensationMethod, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -236,8 +247,9 @@ public void passesOmegaContextThroughReactiveX() throws Exception {
     waitTillSavedUser(username);
 
     assertArrayEquals(
-        new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+        new String[] {
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,7 +267,8 @@ public void passesOmegaContextAmongActors() throws Exception {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxStartedEvent(
+                globalTxId, newLocalTxId, globalTxId, compensationMethod, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 074a5eca..15d0a6c0 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -18,31 +18,33 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 
 class CompensableInterceptor implements EventAwareInterceptor {
   private final OmegaContext context;
   private final MessageSender sender;
 
   CompensableInterceptor(OmegaContext context, MessageSender sender) {
-    this.context = context;
     this.sender = sender;
+    this.context = context;
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    return sender
-        .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
+  public AlphaResponse preIntercept(String parentTxId, String retriesMethod, String compensationMethod, int retries,
+      Object... message) {
+    return sender.send(new TxStartedEvent(
+        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, retriesMethod, retries, message
+    ));
   }
 
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
     sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod));
-
   }
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(
-        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
+    sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
+        throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 46c1e9b3..15cf91a5 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -18,19 +18,26 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
+  private final OmegaContext omegaContext;
   private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
+  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) {
     this.sender = sender;
     this.context = context;
+    this.omegaContext = omegaContext;
   }
 
   @Override
-  public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
+  public void onReceive(String globalTxId, String localTxId, String parentTxId,
+      String compensationMethod, Object... payloads) {
+    String oldLocalTxId = omegaContext.localTxId();
+    omegaContext.setLocalTxId(parentTxId);
     context.compensate(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+    omegaContext.setLocalTxId(oldLocalTxId);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 5f8165f5..6db017ae 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String retriesMethod, String compensationMethod, int retries,
+        Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +34,8 @@ public void onError(String parentTxId, String compensationMethod, Throwable thro
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String retriesMethod, String compensationMethod, int retries,
+      Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7ef021a2..62549b16 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,7 +32,8 @@
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String retriesMethod, String compensationMethod, int retries,
+      Object... message) {
     try {
       return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     } catch (OmegaException e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 09517522..0b93974c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -52,7 +52,7 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwab
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
-    interceptor.preIntercept(context.globalTxId(), method.toString());
+    interceptor.preIntercept(context.globalTxId(), null, method.toString(), 0);
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
     scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index ca145512..a07390e2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,8 +29,9 @@
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String signature, Object... args) {
-    return interceptor.preIntercept(parentTxId, signature, args);
+  public AlphaResponse preIntercept(String parentTxId, String retriesMethod, String signature, int retries,
+      Object... args) {
+    return interceptor.preIntercept(parentTxId, retriesMethod, signature, retries, args);
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a61dc74..e80ae3a5 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -40,12 +40,12 @@
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
+  private final EventAwareInterceptor interceptor;
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-  private final CompensableInterceptor interceptor;
 
-  public TransactionAspect(MessageSender sender, OmegaContext context) {
+  public TransactionAspect(MessageSender messageSender, OmegaContext context) {
     this.context = context;
-    this.interceptor = new CompensableInterceptor(context, sender);
+    this.interceptor = new CompensableInterceptor(context,messageSender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -53,13 +53,16 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    String signature = compensationMethodSignature(joinPoint, compensable, method);
+    Object[] args = joinPoint.getArgs();
+    int retries = compensable.retries();
+    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
+    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    AlphaResponse response = interceptor.preIntercept(localTxId, retriesSignature, compensationSignature, retries, args);
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
@@ -68,16 +71,15 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
-    // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
-    scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
+    scheduleTimeoutTask(interceptor, localTxId, compensationSignature, method, compensable.timeout());
 
     try {
       Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(localTxId, compensationSignature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(localTxId, compensationSignature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
@@ -87,7 +89,7 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr
 
   private void scheduleTimeoutTask(
       TimeAwareInterceptor interceptor,
-      String localTxId,
+      String parentTxId,
       String signature,
       Method method,
       int timeout) {
@@ -95,7 +97,7 @@ private void scheduleTimeoutTask(
     if (timeout > 0) {
       executor.schedule(
           () -> interceptor.onTimeout(
-              localTxId,
+              parentTxId,
               signature,
               new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
           timeout,
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 1398d3ea..ad371968 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -30,8 +30,16 @@
   private final String parentTxId;
   private final String compensationMethod;
   private final Object[] payloads;
+  private final String retriesMethod;
+  private final int retries;
 
-  public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
+  public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      Object... payloads) {
+    this(type, globalTxId, localTxId, parentTxId, compensationMethod, null, 0, payloads);
+  }
+
+  public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      String retriesMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
     this.localTxId = localTxId;
@@ -39,6 +47,8 @@ public TxEvent(EventType type, String globalTxId, String localTxId, String paren
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
     this.globalTxId = globalTxId;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
   }
 
   public long timestamp() {
@@ -69,6 +79,14 @@ public String compensationMethod() {
     return compensationMethod;
   }
 
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index ce93ea3e..c6c0a725 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,7 +21,9 @@
 
 public class TxStartedEvent extends TxEvent {
 
-  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, payloads);
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+      String retriesMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, retriesMethod, retries,
+        payloads);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index c6bbfb6b..64a3e0dc 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -25,6 +25,8 @@
 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
+  int retries() default 0;
+
   String compensationMethod();
 
   int timeout() default 0;
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 21af7e6e..c25343e5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -25,6 +25,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -47,6 +48,7 @@
   };
 
   private final String message = uniquify("message");
+  private final String retriesMethod = uniquify("retries");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -62,13 +64,16 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, message);
+    int retries = new Random().nextInt();
+    interceptor.preIntercept(parentTxId, retriesMethod, compensationMethod, retries, message);
 
     TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.retries(), is(retries));
+    assertThat(event.retriesMethod(), is(retriesMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index 0b33d4b8..c9c73947 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,6 +28,7 @@
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -44,8 +45,9 @@
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
+  private final OmegaContext omegaContext = mock(OmegaContext.class);
   private final CompensationContext context = mock(CompensationContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context);
 
   @Test
   public void sendsEventOnCompensationCompleted() throws Exception {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 566a456b..4c01b52f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public void setUp() throws Exception {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null);
+    sagaStartAnnotationProcessor.preIntercept(null, null, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public void transformInterceptedException() {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null);
+      sagaStartAnnotationProcessor.preIntercept(null, null, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 9ff0214c..948749d1 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -45,7 +45,8 @@
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String retriesMethod, String compensationMethod, int retries,
+        Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -133,4 +134,4 @@ private void waitTillAllDone(List<Future<?>> futures)
       future.get();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 26368814..27011e81 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -46,6 +46,8 @@ message GrpcTxEvent {
   bytes payloads = 7;
   string serviceName = 8;
   string instanceId = 9;
+  int32 retries = 10;
+  string retriesMethod = 11;
 }
 
 message GrpcCompensateCommand {
@@ -54,4 +56,4 @@ message GrpcCompensateCommand {
   string parentTxId = 3;
   string compensateMethod = 4;
   bytes payloads = 5;
-}
\ No newline at end of file
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [pack] retry sub-transaction on failure
> ---------------------------------------
>
>                 Key: SCB-224
>                 URL: https://issues.apache.org/jira/browse/SCB-224
>             Project: Apache ServiceComb
>          Issue Type: New Feature
>          Components: Saga
>            Reporter: Yin Xiang
>            Assignee: Eric Lee
>            Priority: Major
>             Fix For: saga-0.2.0
>
>
> as a user, i want to retry transaction in my service, so that it can always be done eventually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)