You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:46 UTC

[incubator-servicecomb-saga] 02/07: SCB-224: omega support retries

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0651b91725700b5c09d253d2374845acabf42b90
Author: zhang2014 <co...@gmail.com>
AuthorDate: Wed Jan 17 23:06:04 2018 +0800

    SCB-224: omega support retries
---
 .../connector/grpc/GrpcClientMessageSender.java    |  5 ++++-
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  8 +++++---
 .../connector/grpc/RetryableMessageSenderTest.java |  2 +-
 .../spring/CompensableMethodCheckingCallback.java  |  1 +
 .../spring/TransactionInterceptionTest.java        | 23 ++++++++++++----------
 .../omega/transaction/CompensableInterceptor.java  |  4 ++--
 .../omega/transaction/EventAwareInterceptor.java   |  4 ++--
 .../transaction/SagaStartAnnotationProcessor.java  |  4 ++--
 .../saga/omega/transaction/SagaStartAspect.java    |  2 +-
 .../omega/transaction/TimeAwareInterceptor.java    |  0
 .../saga/omega/transaction/TransactionAspect.java  | 11 +++++++----
 .../saga/omega/transaction/TxEvent.java            | 14 ++++++++++++-
 .../saga/omega/transaction/TxStartedEvent.java     |  4 ++--
 .../omega/transaction/annotations/Compensable.java |  4 +++-
 .../transaction/CompensableInterceptorTest.java    |  8 ++++++--
 .../SagaStartAnnotationProcessorTest.java          |  4 ++--
 .../transaction/TimeAwareInterceptorTest.java      |  0
 17 files changed, 64 insertions(+), 34 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 5712f57..cf53a0c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -65,7 +65,8 @@ public class GrpcClientMessageSender implements MessageSender {
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
+    this.compensateStreamObserver =
+        new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
@@ -103,6 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
         .setType(event.type().name())
         .setTimeout(event.timeout())
         .setCompensationMethod(event.compensationMethod())
+        .setRetries(event.retries())
+        .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
         .setPayloads(payloads);
 
     return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 788cf96..d66b737 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -98,7 +98,7 @@ public class LoadBalancedClusterMessageSenderTest {
   private final String parentTxId = uniquify("parentTxId");
   private final String compensationMethod = getClass().getCanonicalName();
   private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
-      compensationMethod, 0, "blah");
+      compensationMethod, 0, null, 0, "blah");
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -189,7 +189,7 @@ public class LoadBalancedClusterMessageSenderTest {
     await().atMost(3, SECONDS).until(() -> compensated.contains(globalTxId));
   }
 
-  @Test (timeout = 1000)
+  @Test(timeout = 1000)
   public void stopSendingOnInterruption() throws Exception {
     MessageSender underlying = Mockito.mock(MessageSender.class);
     doThrow(RuntimeException.class).when(underlying).send(event);
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
   public void forwardSendResult() {
     assertThat(messageSender.send(event).aborted(), is(false));
 
-    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah");
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
@@ -358,6 +358,8 @@ public class LoadBalancedClusterMessageSenderTest {
           request.getParentTxId(),
           request.getCompensationMethod(),
           0,
+          null,
+          0,
           new String(request.getPayloads().toByteArray())));
 
       sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 95bda85..9d0ebc4 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@ public class RetryableMessageSenderTest {
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
-  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0);
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
 
   @Test
   public void sendEventWhenSenderIsAvailable() {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 6c0c333..268fad9 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -48,6 +48,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
 
     try {
       Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+      compensationContext.addCompensationContext(method, bean);
       compensationContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 2ec42a5..19d0942 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,11 +106,14 @@ public class TransactionInterceptionTest {
 
   private String compensationMethod;
 
+  private String retriesMethod;
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(globalTxId);
+    retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
@@ -131,7 +134,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -152,7 +155,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -174,9 +177,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
             new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -196,9 +199,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -215,9 +218,9 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
             new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -237,7 +240,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
@@ -255,7 +258,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
             new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 53e5158..fcce034 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
     return sender.send(new TxStartedEvent(
-        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message));
+        context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index bb2cca4..e20bea2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
       return new AlphaResponse(false);
     }
 
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b7afcf5..49dd8e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
     try {
-      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 388f237..a2ee58c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
     initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout());
+    sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
     try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..e69de29
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 932b990..86cc840 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,12 +49,15 @@ public class TransactionAspect {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
 
-    String signature = compensationMethodSignature(joinPoint, compensable, method);
+    Object[] args = joinPoint.getArgs();
+    int retries = compensable.retries();
+    String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
+    String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
-    AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
+    AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
@@ -65,11 +68,11 @@ public class TransactionAspect {
 
     try {
       Object result = joinPoint.proceed();
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(localTxId, compensationSignature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(localTxId, compensationSignature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 34be420..a5b5514 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,9 +31,11 @@ public class TxEvent {
   private final String compensationMethod;
   private final int timeout;
   private final Object[] payloads;
+  private final String retriesMethod;
+  private final int retries;
 
   public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
-      int timeout, Object... payloads) {
+      int timeout, String retriesMethod, int retries, Object... payloads) {
     this.timestamp = System.currentTimeMillis();
     this.type = type;
     this.localTxId = localTxId;
@@ -42,6 +44,8 @@ public class TxEvent {
     this.payloads = payloads;
     this.globalTxId = globalTxId;
     this.timeout = timeout;
+    this.retriesMethod = retriesMethod;
+    this.retries = retries;
   }
 
   public long timestamp() {
@@ -76,6 +80,14 @@ public class TxEvent {
     return timeout;
   }
 
+  public String retriesMethod() {
+    return retriesMethod;
+  }
+
+  public int retries() {
+    return retries;
+  }
+
   @Override
   public String toString() {
     return type.name() + "{" +
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 4732d95..cb2580e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -22,7 +22,7 @@ import org.apache.servicecomb.saga.common.EventType;
 public class TxStartedEvent extends TxEvent {
 
   public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
-      String compensationMethod, int timeout, Object... payloads) {
-    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads);
+      String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
+    super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 11ba7c7..e9bf6a7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -36,8 +36,10 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
 
+  int retries() default 0;
+
   /**
-   * Compensation method name, should not be null.<br>
+   * Compensation method name.<br>
    * A compensation method should satisfy below requirements:
    * <ol>
    *   <li>has same parameter list as @Compensable method's</li>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0ef9d4d..76ffc03 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,13 +18,13 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -47,6 +47,7 @@ public class CompensableInterceptorTest {
   };
 
   private final String message = uniquify("message");
+  private final String retriesMethod = uniquify("retries");
   private final String compensationMethod = getClass().getCanonicalName();
 
   @SuppressWarnings("unchecked")
@@ -62,13 +63,16 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
+    int retries = new Random().nextInt();
+    interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
 
     TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.retries(), is(retries));
+    assertThat(event.retriesMethod(), is(retriesMethod));
     assertThat(event.type(), is(EventType.TxStartedEvent));
     assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index cc84fc5..8c496f6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest {
 
   @Test
   public void sendsSagaStartedEvent() {
-    sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+    sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
 
     TxEvent event = messages.get(0);
 
@@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest {
     doThrow(exception).when(sender).send(any());
 
     try {
-      sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+      sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
       expectFailing(TransactionalException.class);
     } catch (TransactionalException e) {
       assertThat(e.getMessage(), is("exception"));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
new file mode 100644
index 0000000..e69de29

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.