You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:10 UTC

[incubator-servicecomb-saga] branch SCB-212_tx_timeout updated (30c87c8 -> b878c80)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    omit 30c87c8  SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
    omit c74bfe3  SCB-212 removed unnecessary lines
    omit 59d5239  SCB-212 added saga start timeout
    omit 0b5ddc7  SCB-212 replaced timeout impl with atomic to avoid locking
    omit c8cc12d  SCB-212 tx timeout impl
    omit 50e82bb  SCB-212 united interceptors into one
    omit 35d4bed  SCB-212 removed parent tx id from omega context, since it's not necessary
    omit 0aa4e8a  SCB-212 made each @Compensable as a new sub-transaction
    omit 9b930ad  SCB-212 delegated compensation context a dedicated class
    omit 922ddbc  SCB-164 fail fast on script error
    omit 97181bf  SCB-164 added waiting for async update to fix random test failures
    omit e604b1f  SCB-164 reused const ACK to avoid recreate it every time
    omit 7cfddda  SCB-164 ensured compensation stream can be established after reconnection
    omit e29597a  SCB-164 reconnected to alpha on connection loss
     add 5948b01  SCB-164 reconnected to alpha on connection loss
     add 105cf02  SCB-164 ensured compensation stream can be established after reconnection
     add bb26540  SCB-164 reused const ACK to avoid recreate it every time
     add 822e218  SCB-164 added waiting for async update to fix random test failures
     add 9646960  SCB-164 fail fast on script error
     add d2da5e1  SCB-164 increased processing delay on port 8090
     new b85b4e9  SCB-212 delegated compensation context a dedicated class
     new 182d5e9  SCB-212 made each @Compensable as a new sub-transaction
     new ee75104  SCB-212 removed parent tx id from omega context, since it's not necessary
     new 8bde2cd  SCB-212 united interceptors into one
     new 9a8b43b  SCB-212 tx timeout impl
     new 32a226b  SCB-212 replaced timeout impl with atomic to avoid locking
     new edf12b5  SCB-212 added saga start timeout
     new 6eb55ef  SCB-212 removed unnecessary lines
     new b878c80  SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (30c87c8)
            \
             N -- N -- N   refs/heads/SCB-212_tx_timeout (b878c80)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 03/09: SCB-212 removed parent tx id from omega context, since it's not necessary

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ee75104aaca61c6493844d4d8a3a3a3b69f0e482
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 15:56:53 2018 +0800

    SCB-212 removed parent tx id from omega context, since it's not necessary
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java            | 11 -----------
 .../saga/omega/context/OmegaContextTest.java        | 18 ------------------
 .../transaction/spring/ExecutorFieldCallback.java   | 13 ++++---------
 .../saga/omega/transaction/TransactionAspect.java   | 21 +++++++++------------
 .../SagaStartAnnotationProcessorTest.java           |  2 --
 .../saga/omega/transaction/SagaStartAspectTest.java |  3 ---
 .../omega/transaction/TransactionAspectTest.java    |  3 ---
 .../TransactionHandlerInterceptorTest.java          |  2 --
 8 files changed, 13 insertions(+), 60 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 43bf0b4..daa8e7c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -23,7 +23,6 @@ public class OmegaContext {
 
   private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
   private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
-  private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
 
   public OmegaContext(IdGenerator<String> idGenerator) {
@@ -58,18 +57,9 @@ public class OmegaContext {
     return localTxId.get();
   }
 
-  public String parentTxId() {
-    return parentTxId.get();
-  }
-
-  public void setParentTxId(String parentTxId) {
-    this.parentTxId.set(parentTxId);
-  }
-
   public void clear() {
     globalTxId.remove();
     localTxId.remove();
-    parentTxId.remove();
   }
 
   @Override
@@ -77,7 +67,6 @@ public class OmegaContext {
     return "OmegaContext{" +
         "globalTxId=" + globalTxId.get() +
         ", localTxId=" + localTxId.get() +
-        ", parentTxId=" + parentTxId.get() +
         '}';
   }
 }
diff --git a/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java b/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
index 3f844d3..b741e88 100644
--- a/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
+++ b/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
@@ -67,24 +67,6 @@ public class OmegaContextTest {
     CompletableFuture.allOf(future1, future2).join();
   }
 
-  @Test
-  public void eachThreadGetsDifferentParentTxId() throws Exception {
-    CyclicBarrier barrier = new CyclicBarrier(2);
-
-    Runnable runnable = exceptionalRunnable(() -> {
-      String parentId = UUID.randomUUID().toString();
-      omegaContext.setParentTxId(parentId);
-      barrier.await();
-
-      assertThat(omegaContext.parentTxId(), is(parentId));
-    });
-
-    CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
-    CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
-
-    CompletableFuture.allOf(future1, future2).join();
-  }
-
   private Runnable exceptionalRunnable(ExceptionalRunnable runnable) {
     return () -> {
       try {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
index 5ae949d..e07356c 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -68,7 +68,6 @@ class ExecutorFieldCallback implements FieldCallback {
 
     private final String globalTxId;
     private final String localTxId;
-    private final String parentTxId;
     private final Object runnable;
     private final OmegaContext omegaContext;
 
@@ -84,29 +83,25 @@ class ExecutorFieldCallback implements FieldCallback {
       this.omegaContext = omegaContext;
       this.globalTxId = omegaContext.globalTxId();
       this.localTxId = omegaContext.localTxId();
-      this.parentTxId = omegaContext.parentTxId();
       this.runnable = runnable;
     }
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
       try {
-        LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+        LOG.debug("Setting OmegaContext with globalTxId [{}] & localTxId [{}]",
             globalTxId,
-            localTxId,
-            parentTxId);
+            localTxId);
 
         omegaContext.setGlobalTxId(globalTxId);
         omegaContext.setLocalTxId(localTxId);
-        omegaContext.setParentTxId(parentTxId);
 
         return method.invoke(runnable, args);
       } finally {
         omegaContext.clear();
-        LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+        LOG.debug("Cleared OmegaContext with globalTxId [{}] & localTxId [{}]",
             globalTxId,
-            localTxId,
-            parentTxId);
+            localTxId);
       }
     }
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index f62b92e..b5c6859 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -53,23 +53,20 @@ public class TransactionAspect {
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
-    String parentTxId = context.parentTxId();
-    context.setParentTxId(localTxId);
 
-    preIntercept(joinPoint, signature);
+    preIntercept(joinPoint, signature, localTxId);
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
-      postIntercept(signature);
+      postIntercept(signature, localTxId);
 
       return result;
     } catch (Throwable throwable) {
-      interceptException(signature, throwable);
+      interceptException(signature, throwable, localTxId);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
-      context.setParentTxId(parentTxId);
       LOG.debug("Restored context back to {}", context);
     }
   }
@@ -83,28 +80,28 @@ public class TransactionAspect {
         .toString();
   }
 
-  private void preIntercept(ProceedingJoinPoint joinPoint, String signature) {
+  private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
     preTransactionInterceptor.intercept(
         context.globalTxId(),
         context.newLocalTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature,
         joinPoint.getArgs());
   }
 
-  private void postIntercept(String signature) {
+  private void postIntercept(String signature, String parentTxId) {
     postTransactionInterceptor.intercept(
         context.globalTxId(),
         context.localTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature);
   }
 
-  private void interceptException(String signature, Throwable throwable) {
+  private void interceptException(String signature, Throwable throwable, String parentTxId) {
     failedTransactionInterceptor.intercept(
         context.globalTxId(),
         context.localTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature,
         throwable);
   }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index fba7826..0dadebe 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -57,7 +57,6 @@ public class SagaStartAnnotationProcessorTest {
 
     assertThat(context.globalTxId(), is(globalTxId));
     assertThat(context.localTxId(), is(globalTxId));
-    assertThat(context.parentTxId(), is(nullValue()));
 
     TxEvent event = messages.get(0);
 
@@ -87,6 +86,5 @@ public class SagaStartAnnotationProcessorTest {
 
     assertThat(context.globalTxId(), is(nullValue()));
     assertThat(context.localTxId(), is(nullValue()));
-    assertThat(context.parentTxId(), is(nullValue()));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index cfaa7b6..432b3ad 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -63,7 +63,6 @@ public class SagaStartAspectTest {
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
   }
 
   @Test
@@ -89,7 +88,6 @@ public class SagaStartAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test
@@ -115,7 +113,6 @@ public class SagaStartAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   private String doNothing() {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index bd8829c..65f23a7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -64,7 +64,6 @@ public class TransactionAspectTest {
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
   }
 
   @Test
@@ -89,7 +88,6 @@ public class TransactionAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(parentTxId));
   }
 
   @Test
@@ -115,7 +113,6 @@ public class TransactionAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(parentTxId));
   }
 
   private String doNothing() {
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
index 03f278f..7b41540 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
@@ -63,7 +63,6 @@ public class TransactionHandlerInterceptorTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test
@@ -74,7 +73,6 @@ public class TransactionHandlerInterceptorTest {
     requestInterceptor.preHandle(request, response, null);
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 09/09: SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b878c80ac09942f8242139c431cd616e80c04ffa
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800

    SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 26 +++++++++++++---
 .../saga/alpha/core/TxEventRepository.java         |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 35 +++++++++++++++++-----
 .../saga/alpha/server/SpringTxEventRepository.java |  2 +-
 4 files changed, 51 insertions(+), 14 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839f..662fb54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,10 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,8 +33,7 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
-    put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
   }};
 
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -40,11 +43,26 @@ public class TxConsistentService {
 
   public void handle(TxEvent event) {
     eventRepository.save(event);
-    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+    CompletableFuture.runAsync(() -> {
+      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+        omegaCallback.compensate(event);
+      }
+
+      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+    });
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
   }
+
+  private boolean isGlobalTxAborted(TxEvent event) {
+    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  }
+
+  private boolean isTxEndedEvent(TxEvent event) {
+    return TxEndedEvent.name().equals(event.type());
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
 public interface TxEventRepository {
   void save(TxEvent event);
 
-  List<TxEvent> findStartedTransactions(String globalTxId, String type);
+  List<TxEvent> findTransactions(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 6443997..30ec2a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,9 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -44,7 +47,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+    public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
@@ -69,8 +72,8 @@ public class TxConsistentServiceTest {
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
         newEvent(EventType.SagaStartedEvent),
-        newEvent(EventType.TxStartedEvent),
-        newEvent(EventType.TxEndedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
         newEvent(EventType.TxCompensatedEvent),
         newEvent(EventType.SagaEndedEvent)};
 
@@ -85,14 +88,14 @@ public class TxConsistentServiceTest {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
 
     String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
 
-    TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
@@ -103,6 +106,22 @@ public class TxConsistentServiceTest {
     ));
   }
 
+  @Test
+  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+    ));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+  public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
         .stream()
         .map(TxEventEnvelope::event)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/09: SCB-212 united interceptors into one

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 8bde2cd5b05479332c0de7c7cc0743e6ddb34e72
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 16:45:56 2018 +0800

    SCB-212 united interceptors into one
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/annotations/SagaStart.java  |  1 +
 ...nterceptor.java => CompensableInterceptor.java} | 18 ++++++--
 ...Interceptor.java => EventAwareInterceptor.java} | 12 ++---
 .../transaction/PreTransactionInterceptor.java     | 30 ------------
 .../saga/omega/transaction/TransactionAspect.java  | 14 ++----
 .../omega/transaction/annotations/Compensable.java |  2 +
 ...orTest.java => CompensableInterceptorTest.java} | 40 ++++++++++++++--
 .../PostTransactionInterceptorTest.java            | 53 ----------------------
 .../omega/transaction/TransactionAspectTest.java   |  1 -
 9 files changed, 62 insertions(+), 109 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
index 435d72f..7937061 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
@@ -26,4 +26,5 @@ import java.lang.annotation.Target;
 @Retention(RUNTIME)
 @Target(METHOD)
 public @interface SagaStart {
+  int timeout() default 0;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
similarity index 58%
rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 862efc9..b443c4d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -17,14 +17,26 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-class FailedTransactionInterceptor {
+class CompensableInterceptor implements EventAwareInterceptor {
   private final MessageSender sender;
 
-  FailedTransactionInterceptor(MessageSender sender) {
+  CompensableInterceptor(MessageSender sender) {
     this.sender = sender;
   }
 
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+  @Override
+  public void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
+    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
+  }
+
+  @Override
+  public void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+
+  }
+
+  @Override
+  public void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
     sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
similarity index 68%
rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java
rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 4197560..9be92e6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -17,14 +17,10 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-class PostTransactionInterceptor {
-  private final MessageSender sender;
+public interface EventAwareInterceptor {
+  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
 
-  PostTransactionInterceptor(MessageSender sender) {
-    this.sender = sender;
-  }
+  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
 
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
-  }
+  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
deleted file mode 100644
index 3280d11..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-class PreTransactionInterceptor {
-  private final MessageSender sender;
-
-  PreTransactionInterceptor(MessageSender sender) {
-    this.sender = sender;
-  }
-
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
-  }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index b5c6859..a447489 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -32,17 +32,13 @@ import org.slf4j.LoggerFactory;
 @Aspect
 public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final PreTransactionInterceptor preTransactionInterceptor;
-  private final PostTransactionInterceptor postTransactionInterceptor;
-  private final FailedTransactionInterceptor failedTransactionInterceptor;
 
   private final OmegaContext context;
+  private final EventAwareInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
     this.context = context;
-    this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
-    this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
-    this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
+    this.interceptor = new CompensableInterceptor(sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -81,7 +77,7 @@ public class TransactionAspect {
   }
 
   private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
-    preTransactionInterceptor.intercept(
+    interceptor.preIntercept(
         context.globalTxId(),
         context.newLocalTxId(),
         parentTxId,
@@ -90,7 +86,7 @@ public class TransactionAspect {
   }
 
   private void postIntercept(String signature, String parentTxId) {
-    postTransactionInterceptor.intercept(
+    interceptor.postIntercept(
         context.globalTxId(),
         context.localTxId(),
         parentTxId,
@@ -98,7 +94,7 @@ public class TransactionAspect {
   }
 
   private void interceptException(String signature, Throwable throwable, String parentTxId) {
-    failedTransactionInterceptor.intercept(
+    interceptor.onError(
         context.globalTxId(),
         context.localTxId(),
         parentTxId,
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 0777ce4..c6bbfb6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -26,4 +26,6 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
   String compensationMethod();
+
+  int timeout() default 0;
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
similarity index 56%
rename from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
rename to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0a54e26..7505a1f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -29,7 +29,8 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-public class PreTransactionInterceptorTest {
+public class CompensableInterceptorTest {
+
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
@@ -38,11 +39,13 @@ public class PreTransactionInterceptorTest {
   private final MessageSender sender = messages::add;
 
   private final String message = uniquify("message");
-  private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender);
+  private final String compensationMethod = getClass().getCanonicalName();
+
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(sender);
 
   @Test
-  public void sendsTxStartedEvent() throws Exception {
-    interceptor.intercept(globalTxId, localTxId, parentTxId, getClass().getCanonicalName(), message);
+  public void sendsTxStartedEventBefore() throws Exception {
+    interceptor.preIntercept(globalTxId, localTxId, parentTxId, compensationMethod, message);
 
     TxEvent event = messages.get(0);
 
@@ -50,7 +53,34 @@ public class PreTransactionInterceptorTest {
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
     assertThat(event.type(), is("TxStartedEvent"));
-    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+    assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
   }
+
+  @Test
+  public void sendsTxEndedEventAfter() throws Exception {
+    interceptor.postIntercept(globalTxId, localTxId, parentTxId, compensationMethod);
+
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxEndedEvent"));
+    assertThat(event.compensationMethod(), is(compensationMethod));
+    assertThat(event.payloads().length, is(0));
+  }
+
+  @Test
+  public void sendsTxAbortedEventOnError() throws Exception {
+    interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+    assertThat(event.compensationMethod(), is(compensationMethod));
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
deleted file mode 100644
index 50a9cae..0000000
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.junit.Test;
-
-public class PostTransactionInterceptorTest {
-  private final List<TxEvent> messages = new ArrayList<>();
-  private final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
-
-  private final MessageSender sender = messages::add;
-
-  private final PostTransactionInterceptor interceptor = new PostTransactionInterceptor(sender);
-
-  @Test
-  public void sendsSerializedMessage() throws Exception {
-    interceptor.intercept(globalTxId, localTxId, parentTxId, getClass().getCanonicalName());
-
-
-    TxEvent event = messages.get(0);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(localTxId));
-    assertThat(event.parentTxId(), is(parentTxId));
-    assertThat(event.type(), is("TxEndedEvent"));
-    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
-    assertThat(event.payloads().length, is(0));
-  }
-}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 65f23a7..76a0e34 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -39,7 +39,6 @@ public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
 
   private final String newLocalTxId = UUID.randomUUID().toString();
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 08/09: SCB-212 removed unnecessary lines

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6eb55efe521b39223df52a5815b59ca6eac04377
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:40:58 2018 +0800

    SCB-212 removed unnecessary lines
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index c282ecc..5322269 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -44,7 +44,6 @@ import org.mockito.Mockito;
 public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -61,7 +60,6 @@ public class SagaStartAspectTest {
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
-    when(joinPoint.getTarget()).thenReturn(this);
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     omegaContext.clear();

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/09: SCB-212 tx timeout impl

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9a8b43b3da891ab998f3d36043943c2c106d7134
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 21:07:30 2018 +0800

    SCB-212 tx timeout impl
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-transaction/pom.xml                    |  4 ++
 .../omega/transaction/CompensableInterceptor.java  | 19 +++++---
 .../omega/transaction/EventAwareInterceptor.java   | 20 ++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 54 ++++++++++++++++++++++
 .../saga/omega/transaction/TransactionAspect.java  | 46 ++++++------------
 .../transaction/CompensableInterceptorTest.java    | 21 +++++++--
 .../transaction/TimeAwareInterceptorTest.java}     |  8 ++--
 .../omega/transaction/TransactionAspectTest.java   | 44 ++++++++++++++++--
 8 files changed, 163 insertions(+), 53 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 90c00e9..1829650 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -59,6 +59,10 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index b443c4d..76193cd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -17,26 +17,31 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+
 class CompensableInterceptor implements EventAwareInterceptor {
+  private final OmegaContext context;
   private final MessageSender sender;
 
-  CompensableInterceptor(MessageSender sender) {
+  CompensableInterceptor(OmegaContext context, MessageSender sender) {
+    this.context = context;
     this.sender = sender;
   }
 
   @Override
-  public void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
+  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
   }
 
   @Override
-  public void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+  public void postIntercept(String parentTxId, String compensationMethod) {
+    sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod));
 
   }
 
   @Override
-  public void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    sender.send(
+        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 9be92e6..291538f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -18,9 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
+  EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    }
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+    }
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
+    @Override
+    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    }
+  };
+
+  void preIntercept(String parentTxId, String compensationMethod, Object... message);
+
+  void postIntercept(String parentTxId, String compensationMethod);
+
+  void onError(String parentTxId, String compensationMethod, Throwable throwable);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..d633630
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeoutException;
+
+class TimeAwareInterceptor implements EventAwareInterceptor {
+  private final EventAwareInterceptor interceptor;
+  private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+
+  TimeAwareInterceptor(EventAwareInterceptor interceptor) {
+    this.interceptor = interceptor;
+    this.interceptors.offer(interceptor);
+  }
+
+  @Override
+  public void preIntercept(String localTxId, String signature, Object... args) {
+    interceptor.preIntercept(localTxId, signature, args);
+  }
+
+  @Override
+  public void postIntercept(String localTxId, String signature) {
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().postIntercept(localTxId, signature);
+  }
+
+  @Override
+  public void onError(String localTxId, String signature, Throwable throwable) {
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().onError(localTxId, signature, throwable);
+  }
+
+  void onTimeout(String signature, String localTxId) {
+    interceptors.offerFirst(NO_OP_INTERCEPTOR);
+    interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index a447489..9bee829 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,8 +17,12 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
@@ -34,11 +38,12 @@ public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
-  private final EventAwareInterceptor interceptor;
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+  private final CompensableInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
     this.context = context;
-    this.interceptor = new CompensableInterceptor(sender);
+    this.interceptor = new CompensableInterceptor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -49,17 +54,22 @@ public class TransactionAspect {
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
+    context.newLocalTxId();
 
-    preIntercept(joinPoint, signature, localTxId);
+    TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
+    interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+    if (compensable.timeout() > 0) {
+      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
+    }
 
     try {
       Object result = joinPoint.proceed();
-      postIntercept(signature, localTxId);
+      interceptor.postIntercept(localTxId, signature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptException(signature, throwable, localTxId);
+      interceptor.onError(localTxId, signature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
@@ -75,30 +85,4 @@ public class TransactionAspect {
         .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
         .toString();
   }
-
-  private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
-    interceptor.preIntercept(
-        context.globalTxId(),
-        context.newLocalTxId(),
-        parentTxId,
-        signature,
-        joinPoint.getArgs());
-  }
-
-  private void postIntercept(String signature, String parentTxId) {
-    interceptor.postIntercept(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature);
-  }
-
-  private void interceptException(String signature, Throwable throwable, String parentTxId) {
-    interceptor.onError(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature,
-        throwable);
-  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 7505a1f..609ea6d 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -27,7 +27,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class CompensableInterceptorTest {
 
@@ -41,11 +45,20 @@ public class CompensableInterceptorTest {
   private final String message = uniquify("message");
   private final String compensationMethod = getClass().getCanonicalName();
 
-  private final CompensableInterceptor interceptor = new CompensableInterceptor(sender);
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final OmegaContext context = new OmegaContext(idGenerator);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(context, sender);
+
+  @Before
+  public void setUp() throws Exception {
+    context.setGlobalTxId(globalTxId);
+    context.setLocalTxId(localTxId);
+  }
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(globalTxId, localTxId, parentTxId, compensationMethod, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, message);
 
     TxEvent event = messages.get(0);
 
@@ -59,7 +72,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxEndedEventAfter() throws Exception {
-    interceptor.postIntercept(globalTxId, localTxId, parentTxId, compensationMethod);
+    interceptor.postIntercept(parentTxId, compensationMethod);
 
     TxEvent event = messages.get(0);
 
@@ -73,7 +86,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxAbortedEventOnError() throws Exception {
-    interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+    interceptor.onError(parentTxId, compensationMethod, new RuntimeException("oops"));
 
     TxEvent event = messages.get(0);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
similarity index 68%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 9be92e6..fb2ee1d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
+import static org.junit.Assert.*;
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
+public class TimeAwareInterceptorTest {
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 76a0e34..2ce34b8 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,13 +18,18 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -55,6 +60,7 @@ public class TransactionAspectTest {
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
@@ -67,8 +73,6 @@ public class TransactionAspectTest {
 
   @Test
   public void newLocalTxIdInCompensable() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
-
     aspect.advise(joinPoint, compensable);
 
     TxEvent startedEvent = messages.get(0);
@@ -91,7 +95,6 @@ public class TransactionAspectTest {
 
   @Test
   public void restoreContextOnCompensableError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
@@ -114,6 +117,41 @@ public class TransactionAspectTest {
     assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(compensable.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(newLocalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, compensable);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(newLocalTxId));
+    assertThat(event.parentTxId(), is(localTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId()));
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/09: SCB-212 replaced timeout impl with atomic to avoid locking

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 32a226b51e2f5be6ec3d9a09448c48b1b7ce3fe9
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 22:21:58 2018 +0800

    SCB-212 replaced timeout impl with atomic to avoid locking
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../transaction/OmegaTxTimeoutException.java}      |  10 +-
 .../omega/transaction/TimeAwareInterceptor.java    |  25 ++---
 .../saga/omega/transaction/TransactionAspect.java  |  23 ++++-
 .../transaction/TimeAwareInterceptorTest.java      | 113 ++++++++++++++++++++-
 4 files changed, 150 insertions(+), 21 deletions(-)

diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
similarity index 86%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
index fb2ee1d..eb820d6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
@@ -17,8 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
-
-public class TimeAwareInterceptorTest {
-
-}
\ No newline at end of file
+public class OmegaTxTimeoutException extends RuntimeException {
+  public OmegaTxTimeoutException(String cause) {
+    super(cause);
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index d633630..317f802 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -17,17 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 class TimeAwareInterceptor implements EventAwareInterceptor {
   private final EventAwareInterceptor interceptor;
-  private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+  private final AtomicReference<EventAwareInterceptor> interceptorRef;
 
   TimeAwareInterceptor(EventAwareInterceptor interceptor) {
     this.interceptor = interceptor;
-    this.interceptors.offer(interceptor);
+    this.interceptorRef = new AtomicReference<>(interceptor);
   }
 
   @Override
@@ -37,18 +35,21 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
 
   @Override
   public void postIntercept(String localTxId, String signature) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().postIntercept(localTxId, signature);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.postIntercept(localTxId, signature);
+    }
   }
 
   @Override
   public void onError(String localTxId, String signature, Throwable throwable) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().onError(localTxId, signature, throwable);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 
-  void onTimeout(String signature, String localTxId) {
-    interceptors.offerFirst(NO_OP_INTERCEPTOR);
-    interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+  void onTimeout(String localTxId, String signature, Throwable throwable) {
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 9bee829..d3e091c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,9 +59,8 @@ public class TransactionAspect {
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
     interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
-    if (compensable.timeout() > 0) {
-      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
-    }
+
+    scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
 
     try {
       Object result = joinPoint.proceed();
@@ -77,6 +76,24 @@ public class TransactionAspect {
     }
   }
 
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      String localTxId,
+      String signature,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              localTxId,
+              signature,
+              new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
+    }
+  }
+
   private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
       throws NoSuchMethodException {
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index fb2ee1d..d26f04f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,8 +17,119 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
 
 public class TimeAwareInterceptorTest {
+  private static final int runningCounts = 1000;
+
+  private final String localTxId = uniquify("localTxId");
+  private final String signature = uniquify("signature");
+
+  private final AtomicInteger postInterceptInvoked = new AtomicInteger();
+  private final AtomicInteger onErrorInvoked = new AtomicInteger();
+  private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
+
+  private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    }
+
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+      postInterceptInvoked.incrementAndGet();
+    }
+
+    @Override
+    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+      if (throwable instanceof OmegaTxTimeoutException) {
+        onTimeoutInvoked.incrementAndGet();
+      } else {
+        onErrorInvoked.incrementAndGet();
+      }
+    }
+  };
+
+  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
+  private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out");
+
+
+  @Test(timeout = 5000)
+  public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception {
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.postIntercept(localTxId, signature);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  @Test(timeout = 5000)
+  public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
+    RuntimeException oops = new RuntimeException("oops");
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onError(localTxId, signature, oops);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  private void waitForSignal(CyclicBarrier cyclicBarrier) {
+    try {
+      cyclicBarrier.await();
+    } catch (InterruptedException | BrokenBarrierException e) {
+      fail(e.getMessage());
+    }
+  }
 
+  private void waitTillAllDone(List<Future<?>> futures)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/09: SCB-212 made each @Compensable as a new sub-transaction

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 182d5e9a1055ae600467f70befac2fcadf59baad
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 15:39:20 2018 +0800

    SCB-212 made each @Compensable as a new sub-transaction
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../integration/pack/tests/GreetingController.java |  13 ++
 .../saga/integration/pack/tests/PackIT.java        | 184 +++++++++++++--------
 .../spring/TransactionAspectConfig.java            |   9 +
 .../spring/TransactionInterceptionTest.java        |  74 +++++----
 .../transaction/SagaStartAnnotationProcessor.java  |  12 +-
 .../saga/omega/transaction/SagaStartAspect.java    |  60 +++++++
 .../saga/omega/transaction/TransactionAspect.java  |  42 +----
 .../SagaStartAnnotationProcessorTest.java          |  11 +-
 .../omega/transaction/SagaStartAspectTest.java     | 124 ++++++++++++++
 .../omega/transaction/TransactionAspectTest.java   | 124 ++++++++++++++
 .../TransactionHandlerInterceptor.java             |  16 +-
 .../TransactionHandlerInterceptorTest.java         |   8 +-
 12 files changed, 523 insertions(+), 154 deletions(-)

diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 3c7c095..2bdd587 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.integration.pack.tests;
 
 import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Controller;
@@ -64,4 +65,16 @@ public class GreetingController {
   ResponseEntity<String> rude(@RequestParam String name) {
     return ResponseEntity.ok(greetingService.beingRude(name));
   }
+
+  @SagaStart
+  @Compensable(compensationMethod = "goodNight")
+  @GetMapping("/goodMorning")
+  ResponseEntity<String> goodMorning(@RequestParam String name) {
+    String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
+    return ResponseEntity.ok("Good morning, " + bonjour);
+  }
+
+  ResponseEntity<String> goodNight(@RequestParam String name) {
+    return ResponseEntity.ok("Good night, " + name);
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 56092be..7ed1b88 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -48,7 +48,6 @@ import org.springframework.test.context.junit4.SpringRunner;
     properties = {"server.port=8080", "spring.application.name=greeting-service"})
 public class PackIT {
   private static final String serviceName = "greeting-service";
-  private final String globalTxId = UUID.randomUUID().toString();
 
   @Autowired
   private TestRestTemplate restTemplate;
@@ -84,47 +83,47 @@ public class PackIT {
 
     assertThat(envelopes.size(), is(6));
 
-    TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
-    assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
-    assertThat(sagaStartedEventEnvelope.localTxId(), is(notNullValue()));
-    assertThat(sagaStartedEventEnvelope.parentTxId(), is(nullValue()));
-    assertThat(sagaStartedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(sagaStartedEventEnvelope.instanceId(), is(notNullValue()));
-
-    TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
-    assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
-    assertThat(txStartedEventEnvelope1.localTxId(), is(notNullValue()));
-    assertThat(txStartedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txStartedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txStartedEventEnvelope1.instanceId(), is(sagaStartedEventEnvelope.instanceId()));
-
-    TxEventEnvelope txEndedEventEnvelope1 = envelopes.get(2);
-    assertThat(txEndedEventEnvelope1.type(), is("TxEndedEvent"));
-    assertThat(txEndedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txEndedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txEndedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txEndedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
-    TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
-    assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
-    assertThat(txStartedEventEnvelope2.localTxId(), is(notNullValue()));
-    assertThat(txStartedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txStartedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txStartedEventEnvelope2.instanceId(), is(notNullValue()));
-
-    TxEventEnvelope txEndedEventEnvelope2 = envelopes.get(4);
-    assertThat(txEndedEventEnvelope2.type(), is("TxEndedEvent"));
-    assertThat(txEndedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txEndedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txEndedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txEndedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
-    TxEventEnvelope sagaEndedEventEnvelope = envelopes.get(5);
-    assertThat(sagaEndedEventEnvelope.type(), is("SagaEndedEvent"));
-    assertThat(sagaEndedEventEnvelope.localTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(sagaEndedEventEnvelope.parentTxId(), is(nullValue()));
-    assertThat(sagaEndedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(sagaEndedEventEnvelope.instanceId(), is(notNullValue()));
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+    assertThat(sagaStartedEvent.localTxId(), is(globalTxId));
+    assertThat(sagaStartedEvent.parentTxId(), is(nullValue()));
+    assertThat(sagaStartedEvent.serviceName(), is(serviceName));
+    assertThat(sagaStartedEvent.instanceId(), is(notNullValue()));
+
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txStartedEvent1.serviceName(), is(serviceName));
+    assertThat(txStartedEvent1.instanceId(), is(sagaStartedEvent.instanceId()));
+
+    TxEventEnvelope txEndedEvent1 = envelopes.get(2);
+    assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txEndedEvent1.serviceName(), is(serviceName));
+    assertThat(txEndedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+    TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txStartedEvent2.serviceName(), is(serviceName));
+    assertThat(txStartedEvent2.instanceId(), is(notNullValue()));
+
+    TxEventEnvelope txEndedEvent2 = envelopes.get(4);
+    assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txEndedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txEndedEvent2.serviceName(), is(serviceName));
+    assertThat(txEndedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
+
+    TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+    assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+    assertThat(sagaEndedEvent.localTxId(), is(globalTxId));
+    assertThat(sagaEndedEvent.parentTxId(), is(nullValue()));
+    assertThat(sagaEndedEvent.serviceName(), is(serviceName));
+    assertThat(sagaEndedEvent.instanceId(), is(notNullValue()));
 
     assertThat(compensatedMessages.isEmpty(), is(true));
   }
@@ -146,36 +145,37 @@ public class PackIT {
     List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
     assertThat(envelopes.size(), is(8));
 
-    TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
-    assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
 
-    TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
-    assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
     assertThat(envelopes.get(2).type(), is("TxEndedEvent"));
 
-    TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
-    assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
-
-    TxEventEnvelope txAbortedEventEnvelope = envelopes.get(4);
-    assertThat(txAbortedEventEnvelope.type(), is("TxAbortedEvent"));
-    assertThat(txAbortedEventEnvelope.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txAbortedEventEnvelope.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txAbortedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(txAbortedEventEnvelope.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
-    TxEventEnvelope txCompensatedEventEnvelope1 = envelopes.get(5);
-    assertThat(txCompensatedEventEnvelope1.type(), is("TxCompensatedEvent"));
-    assertThat(txCompensatedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txCompensatedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txCompensatedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txCompensatedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
-    TxEventEnvelope txCompensatedEventEnvelope2 = envelopes.get(6);
-    assertThat(txCompensatedEventEnvelope2.type(), is("TxCompensatedEvent"));
-    assertThat(txCompensatedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txCompensatedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txCompensatedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txCompensatedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
+    TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+
+    TxEventEnvelope txAbortedEvent = envelopes.get(4);
+    assertThat(txAbortedEvent.type(), is("TxAbortedEvent"));
+    assertThat(txAbortedEvent.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txAbortedEvent.parentTxId(), is(globalTxId));
+    assertThat(txAbortedEvent.serviceName(), is(serviceName));
+    assertThat(txAbortedEvent.instanceId(), is(txStartedEvent2.instanceId()));
+
+    // TODO: 2018/1/9 compensation shall be done in reverse order
+    TxEventEnvelope txCompensatedEvent1 = envelopes.get(5);
+    assertThat(txCompensatedEvent1.type(), is("TxCompensatedEvent"));
+    assertThat(txCompensatedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txCompensatedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txCompensatedEvent1.serviceName(), is(serviceName));
+    assertThat(txCompensatedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+    TxEventEnvelope txCompensatedEvent2 = envelopes.get(6);
+    assertThat(txCompensatedEvent2.type(), is("TxCompensatedEvent"));
+    assertThat(txCompensatedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txCompensatedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txCompensatedEvent2.serviceName(), is(serviceName));
+    assertThat(txCompensatedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
 
     assertThat(envelopes.get(7).type(), is("SagaEndedEvent"));
 
@@ -183,4 +183,50 @@ public class PackIT {
         "Goodbye, " + TRESPASSER,
         "My bad, please take the window instead, " + TRESPASSER));
   }
+
+  @Test(timeout = 5000)
+  public void updatesEmbeddedTxStateToAlpha() throws Exception {
+    ResponseEntity<String> entity = restTemplate.getForEntity("/goodMorning?name={name}",
+        String.class,
+        "mike");
+
+    assertThat(entity.getStatusCode(), is(OK));
+    assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
+
+    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+
+    assertThat(envelopes.size(), is(6));
+
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+
+    TxEventEnvelope txStartedEvent2 = envelopes.get(2);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+    TxEventEnvelope txEndedEvent2 = envelopes.get(3);
+    assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txEndedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+    TxEventEnvelope txEndedEvent1 = envelopes.get(4);
+    assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+
+    TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+    assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
 }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index dc88bbd..5358db5 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -22,10 +22,12 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartAspect;
 import org.apache.servicecomb.saga.omega.transaction.TransactionAspect;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
+import org.springframework.core.annotation.Order;
 
 @Configuration
 @EnableAspectJAutoProxy
@@ -36,6 +38,13 @@ public class TransactionAspectConfig {
     return new CompensationMessageHandler(sender, context);
   }
 
+  @Order(0)
+  @Bean
+  SagaStartAspect sagaStartAspect(MessageSender sender, OmegaContext context) {
+    return new SagaStartAspect(sender, context);
+  }
+
+  @Order(1)
   @Bean
   TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
     return new TransactionAspect(sender, context);
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 3dc8c73..c30953f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,8 +36,9 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
@@ -50,6 +52,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -68,9 +71,11 @@ import akka.actor.Props;
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
+  @SuppressWarnings("unchecked")
+  private static final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
   private static final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
+  private final String newLocalTxId = UUID.randomUUID().toString();
+  private final String anotherLocalTxId = UUID.randomUUID().toString();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
@@ -102,9 +107,9 @@ public class TransactionInterceptionTest {
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
-    omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
+    omegaContext.setLocalTxId(globalTxId);
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
@@ -125,8 +130,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
 
@@ -146,8 +151,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
-            new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(),
+            new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
   }
@@ -160,20 +165,20 @@ public class TransactionInterceptionTest {
     String localTxId = omegaContext.newLocalTxId();
     User anotherUser = userService.add(jack);
 
-    messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
-    messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
+    messageHandler.onReceive(globalTxId, newLocalTxId, globalTxId, compensationMethod, user);
+    messageHandler.onReceive(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser);
 
     assertThat(userRepository.findOne(user.id()), is(nullValue()));
     assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
-            new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
         },
         toArray(messages)
     );
@@ -184,16 +189,16 @@ public class TransactionInterceptionTest {
     new Thread(() -> userService.add(user)).start();
     waitTillSavedUser(username);
 
-    String newLocalTxId = omegaContext.newLocalTxId();
+    String localTxId = omegaContext.newLocalTxId();
     new Thread(() -> userService.add(jack)).start();
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
-            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -203,16 +208,16 @@ public class TransactionInterceptionTest {
     executor.schedule(() -> userService.add(user), 0, MILLISECONDS);
     waitTillSavedUser(username);
 
-    String newLocalTxId = omegaContext.newLocalTxId();
+    String localTxId = omegaContext.newLocalTxId();
     executor.invokeAll(singletonList(() -> userService.add(jack)));
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
-            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -231,8 +236,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -249,8 +254,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
 
@@ -289,8 +294,13 @@ public class TransactionInterceptionTest {
     private final List<String> messages = new ArrayList<>();
 
     @Bean
+    CompensationContext compensationContext() {
+      return new CompensationContext();
+    }
+
+    @Bean
     OmegaContext omegaContext() {
-      return new OmegaContext(new UniqueIdGenerator());
+      return new OmegaContext(idGenerator);
     }
 
     @Bean
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index e92ebb1..64f47ce 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,10 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
-public class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor {
 
   private final OmegaContext omegaContext;
-
   private final MessageSender sender;
 
   SagaStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
@@ -31,7 +30,7 @@ public class SagaStartAnnotationProcessor {
   }
 
   void preIntercept() {
-    String globalTxId = omegaContext.newGlobalTxId();
+    String globalTxId = globalTxId();
     // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
     sender.send(new SagaStartedEvent(globalTxId, globalTxId));
   }
@@ -39,5 +38,12 @@ public class SagaStartAnnotationProcessor {
   void postIntercept() {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new SagaEndedEvent(globalTxId, globalTxId));
+    omegaContext.clear();
+  }
+
+  private String globalTxId() {
+    String globalTxId = omegaContext.newGlobalTxId();
+    omegaContext.setLocalTxId(globalTxId);
+    return globalTxId;
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
new file mode 100644
index 0000000..307af3c
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class SagaStartAspect {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+  private final OmegaContext context;
+
+  public SagaStartAspect(MessageSender sender, OmegaContext context) {
+    this.context = context;
+    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+  }
+
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
+  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+    sagaStartAnnotationProcessor.preIntercept();
+    LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+
+    try {
+      return joinPoint.proceed();
+    } catch (Throwable throwable) {
+      LOG.error("Failed to process SagaStart method: {}", method.toString());
+      throw throwable;
+    } finally {
+      LOG.debug("Transaction with context {} has finished.", context);
+      sagaStartAnnotationProcessor.postIntercept();
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index d141d0c..f62b92e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -36,7 +36,6 @@ public class TransactionAspect {
   private final PostTransactionInterceptor postTransactionInterceptor;
   private final FailedTransactionInterceptor failedTransactionInterceptor;
 
-  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
   private final OmegaContext context;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -44,7 +43,6 @@ public class TransactionAspect {
     this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
     this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
     this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
-    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -54,7 +52,12 @@ public class TransactionAspect {
 
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
+    String localTxId = context.localTxId();
+    String parentTxId = context.parentTxId();
+    context.setParentTxId(localTxId);
+
     preIntercept(joinPoint, signature);
+    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
@@ -64,24 +67,10 @@ public class TransactionAspect {
     } catch (Throwable throwable) {
       interceptException(signature, throwable);
       throw throwable;
-    }
-  }
-
-  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
-  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
-    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
-    LOG.debug("Initializing global tx id before execution of method {}", method.toString());
-    sagaStartAnnotationProcessor.preIntercept();
-
-    try {
-      return joinPoint.proceed();
-    } catch (Throwable throwable) {
-      LOG.error("Failed to process SagaStart method: {}", method.toString());
-      throw throwable;
     } finally {
-      LOG.debug("Transaction {} has finished.", context.globalTxId());
-      sagaStartAnnotationProcessor.postIntercept();
+      context.setLocalTxId(localTxId);
+      context.setParentTxId(parentTxId);
+      LOG.debug("Restored context back to {}", context);
     }
   }
 
@@ -95,12 +84,9 @@ public class TransactionAspect {
   }
 
   private void preIntercept(ProceedingJoinPoint joinPoint, String signature) {
-    // context without a parent should be the first TxStartedEvent
-    initFirstOmegaContext();
-
     preTransactionInterceptor.intercept(
         context.globalTxId(),
-        context.localTxId(),
+        context.newLocalTxId(),
         context.parentTxId(),
         signature,
         joinPoint.getArgs());
@@ -122,14 +108,4 @@ public class TransactionAspect {
         signature,
         throwable);
   }
-
-  private void initFirstOmegaContext() {
-    if (context.parentTxId() != null) {
-      return;
-    }
-    if (context.localTxId() == null) {
-      context.newLocalTxId();
-    }
-    context.setParentTxId(context.globalTxId());
-  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 8913b7f..fba7826 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -41,9 +41,9 @@ public class SagaStartAnnotationProcessorTest {
 
   private final String localTxId = UUID.randomUUID().toString();
 
-  private final IdGenerator generator = mock(IdGenerator.class);
-
   @SuppressWarnings("unchecked")
+  private final IdGenerator<String> generator = mock(IdGenerator.class);
+
   private final OmegaContext context = new OmegaContext(generator);
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
@@ -56,7 +56,7 @@ public class SagaStartAnnotationProcessorTest {
     sagaStartAnnotationProcessor.preIntercept();
 
     assertThat(context.globalTxId(), is(globalTxId));
-    assertThat(context.localTxId(), is(nullValue()));
+    assertThat(context.localTxId(), is(globalTxId));
     assertThat(context.parentTxId(), is(nullValue()));
 
     TxEvent event = messages.get(0);
@@ -73,7 +73,6 @@ public class SagaStartAnnotationProcessorTest {
   public void sendsSagaEndedEvent() {
     context.clear();
     context.setGlobalTxId(globalTxId);
-    context.setLocalTxId(localTxId);
 
     sagaStartAnnotationProcessor.postIntercept();
 
@@ -85,5 +84,9 @@ public class SagaStartAnnotationProcessorTest {
     assertThat(event.compensationMethod().isEmpty(), is(true));
     assertThat(event.type(), is("SagaEndedEvent"));
     assertThat(event.payloads().length, is(0));
+
+    assertThat(context.globalTxId(), is(nullValue()));
+    assertThat(context.localTxId(), is(nullValue()));
+    assertThat(context.parentTxId(), is(nullValue()));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
new file mode 100644
index 0000000..cfaa7b6
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SagaStartAspectTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final MessageSender sender = messages::add;
+  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final Compensable compensable = Mockito.mock(Compensable.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+  private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
+
+  @Before
+  public void setUp() throws Exception {
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+    omegaContext.setParentTxId(parentTxId);
+  }
+
+  @Test
+  public void newGlobalTxIdInSagaStart() throws Throwable {
+    omegaContext.clear();
+    when(idGenerator.nextId()).thenReturn(globalTxId);
+
+    aspect.advise(joinPoint);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(globalTxId));
+    assertThat(startedEvent.parentTxId(), is(nullValue()));
+    assertThat(startedEvent.type(), is("SagaStartedEvent"));
+
+    TxEvent endedEvent = messages.get(1);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(globalTxId));
+    assertThat(endedEvent.parentTxId(), is(nullValue()));
+    assertThat(endedEvent.type(), is("SagaEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(nullValue()));
+    assertThat(omegaContext.localTxId(), is(nullValue()));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
+  }
+
+  @Test
+  public void clearContextOnSagaStartError() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(globalTxId);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is("SagaEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(nullValue()));
+    assertThat(omegaContext.localTxId(), is(nullValue()));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
new file mode 100644
index 0000000..bd8829c
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TransactionAspectTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final String newLocalTxId = UUID.randomUUID().toString();
+
+  private final MessageSender sender = messages::add;
+  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final Compensable compensable = Mockito.mock(Compensable.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+
+  @Before
+  public void setUp() throws Exception {
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+    omegaContext.setParentTxId(parentTxId);
+  }
+
+  @Test
+  public void newLocalTxIdInCompensable() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+
+    aspect.advise(joinPoint, compensable);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent.parentTxId(), is(localTxId));
+    assertThat(startedEvent.type(), is("TxStartedEvent"));
+
+    TxEvent endedEvent = messages.get(1);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(newLocalTxId));
+    assertThat(endedEvent.parentTxId(), is(localTxId));
+    assertThat(endedEvent.type(), is("TxEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(parentTxId));
+  }
+
+  @Test
+  public void restoreContextOnCompensableError() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint, compensable);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(newLocalTxId));
+    assertThat(event.parentTxId(), is(localTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(parentTxId));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index 3e5d620..9b003f8 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -20,6 +20,9 @@
 
 package org.apache.servicecomb.saga.omega.transport.resttemplate;
 
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.LOCAL_TX_ID_KEY;
+
 import java.lang.invoke.MethodHandles;
 
 import javax.servlet.http.HttpServletRequest;
@@ -43,24 +46,21 @@ class TransactionHandlerInterceptor implements HandlerInterceptor {
 
   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
-    String globalTxId = request.getHeader(OmegaContext.GLOBAL_TX_ID_KEY);
+    String globalTxId = request.getHeader(GLOBAL_TX_ID_KEY);
     if (globalTxId == null) {
-      LOG.debug("no such header: {}", OmegaContext.GLOBAL_TX_ID_KEY);
+      LOG.debug("no such header: {}", GLOBAL_TX_ID_KEY);
     } else {
       omegaContext.setGlobalTxId(globalTxId);
-      omegaContext.newLocalTxId();
-      omegaContext.setParentTxId(request.getHeader(OmegaContext.LOCAL_TX_ID_KEY));
+      omegaContext.setLocalTxId(request.getHeader(LOCAL_TX_ID_KEY));
     }
     return true;
   }
 
   @Override
-  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o,
-      ModelAndView modelAndView) {
+  public void postHandle(HttpServletRequest request, HttpServletResponse response, Object o, ModelAndView mv) {
   }
 
   @Override
-  public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o,
-      Exception e) {
+  public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o, Exception e) {
   }
 }
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
index 6ee816b..03f278f 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
@@ -51,9 +51,7 @@ public class TransactionHandlerInterceptorTest {
 
   @Before
   public void setUp() {
-    omegaContext.setGlobalTxId(null);
-    omegaContext.setLocalTxId(null);
-    omegaContext.setParentTxId(null);
+    omegaContext.clear();
   }
 
   @Test
@@ -64,8 +62,8 @@ public class TransactionHandlerInterceptorTest {
     requestInterceptor.preHandle(request, response, null);
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(notNullValue()));
-    assertThat(omegaContext.parentTxId(), is(localTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 07/09: SCB-212 added saga start timeout

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit edf12b5d4459834f60b9978ba81a2e853a5cb43b
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:32:23 2018 +0800

    SCB-212 added saga start timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-transaction/pom.xml                    | 16 +++++-
 .../transaction/SagaStartAnnotationProcessor.java  | 15 ++++--
 .../saga/omega/transaction/SagaStartAspect.java    | 32 ++++++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 16 +++---
 .../SagaStartAnnotationProcessorTest.java          |  4 +-
 .../omega/transaction/SagaStartAspectTest.java     | 59 +++++++++++++++++-----
 .../src/test/resources/log4j2-test.xml             | 30 +++++++++++
 7 files changed, 141 insertions(+), 31 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 1829650..1aa77fb 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -63,7 +63,21 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 64f47ce..6e8556b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
-class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   private final OmegaContext omegaContext;
   private final MessageSender sender;
@@ -29,18 +29,27 @@ class SagaStartAnnotationProcessor {
     this.sender = sender;
   }
 
-  void preIntercept() {
+  @Override
+  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
     String globalTxId = globalTxId();
     // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
     sender.send(new SagaStartedEvent(globalTxId, globalTxId));
   }
 
-  void postIntercept() {
+  @Override
+  public void postIntercept(String parentTxId, String compensationMethod) {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new SagaEndedEvent(globalTxId, globalTxId));
     omegaContext.clear();
   }
 
+  @Override
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    String globalTxId = omegaContext.globalTxId();
+    sender.send(new TxAbortedEvent(globalTxId, globalTxId, null, compensationMethod, throwable));
+    omegaContext.clear();
+  }
+
   private String globalTxId() {
     String globalTxId = omegaContext.newGlobalTxId();
     omegaContext.setLocalTxId(globalTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 307af3c..3fa6322 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,10 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
@@ -33,6 +38,7 @@ public class SagaStartAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final OmegaContext context;
 
   public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -40,13 +46,15 @@ public class SagaStartAspect {
     this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
-  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
-  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
+  Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept();
+    TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
+    interceptor.preIntercept(context.globalTxId(), method.toString());
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
+    scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
       return joinPoint.proceed();
     } catch (Throwable throwable) {
@@ -54,7 +62,23 @@ public class SagaStartAspect {
       throw throwable;
     } finally {
       LOG.debug("Transaction with context {} has finished.", context);
-      sagaStartAnnotationProcessor.postIntercept();
+      interceptor.postIntercept(context.globalTxId(), method.toString());
+    }
+  }
+
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              context.globalTxId(),
+              method.toString(),
+              new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
     }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 317f802..80ad03f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,27 +29,27 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String localTxId, String signature, Object... args) {
-    interceptor.preIntercept(localTxId, signature, args);
+  public void preIntercept(String parentTxId, String signature, Object... args) {
+    interceptor.preIntercept(parentTxId, signature, args);
   }
 
   @Override
-  public void postIntercept(String localTxId, String signature) {
+  public void postIntercept(String parentTxId, String signature) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(parentTxId, signature);
     }
   }
 
   @Override
-  public void onError(String localTxId, String signature, Throwable throwable) {
+  public void onError(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 
-  void onTimeout(String localTxId, String signature, Throwable throwable) {
+  void onTimeout(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 0dadebe..8fa3568 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -53,7 +53,7 @@ public class SagaStartAnnotationProcessorTest {
   public void sendsSagaStartedEvent() {
     when(generator.nextId()).thenReturn(globalTxId, localTxId);
 
-    sagaStartAnnotationProcessor.preIntercept();
+    sagaStartAnnotationProcessor.preIntercept(null, null);
 
     assertThat(context.globalTxId(), is(globalTxId));
     assertThat(context.localTxId(), is(globalTxId));
@@ -73,7 +73,7 @@ public class SagaStartAnnotationProcessorTest {
     context.clear();
     context.setGlobalTxId(globalTxId);
 
-    sagaStartAnnotationProcessor.postIntercept();
+    sagaStartAnnotationProcessor.postIntercept(null, null);
 
     TxEvent event = messages.get(0);
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 432b3ad..c282ecc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,18 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
@@ -40,7 +45,6 @@ public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -48,29 +52,24 @@ public class SagaStartAspectTest {
 
   @SuppressWarnings("unchecked")
   private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
-  private final Compensable compensable = Mockito.mock(Compensable.class);
+  private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
 
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
-    when(compensable.compensationMethod()).thenReturn("doNothing");
-
-    omegaContext.setGlobalTxId(globalTxId);
-    omegaContext.setLocalTxId(localTxId);
+    omegaContext.clear();
   }
 
   @Test
   public void newGlobalTxIdInSagaStart() throws Throwable {
-    omegaContext.clear();
-    when(idGenerator.nextId()).thenReturn(globalTxId);
-
-    aspect.advise(joinPoint);
+    aspect.advise(joinPoint, sagaStart);
 
     TxEvent startedEvent = messages.get(0);
 
@@ -92,13 +91,12 @@ public class SagaStartAspectTest {
 
   @Test
   public void clearContextOnSagaStartError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(globalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint);
+      aspect.advise(joinPoint, sagaStart);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
       assertThat(e, is(oops));
@@ -115,6 +113,41 @@ public class SagaStartAspectTest {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(sagaStart.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(globalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, sagaStart);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }
diff --git a/omega/omega-transaction/src/test/resources/log4j2-test.xml b/omega/omega-transaction/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-transaction/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/09: SCB-212 delegated compensation context a dedicated class

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b85b4e95098550fe7cefe39d8aaad11d03d56d85
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 10:19:00 2018 +0800

    SCB-212 delegated compensation context a dedicated class
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/CompensationContext.java    | 61 ++++++++++++++++++++++
 .../saga/omega/context/OmegaContext.java           | 40 --------------
 .../saga/omega/spring/OmegaSpringConfig.java       |  6 +++
 .../spring/CompensableAnnotationProcessor.java     |  9 +++-
 .../spring/CompensableMethodCheckingCallback.java  | 10 ++--
 .../spring/TransactionAspectConfig.java            |  7 +--
 .../transaction/CompensationMessageHandler.java    | 10 ++--
 .../saga/omega/transaction/TransactionAspect.java  |  2 +-
 .../CompensationMessageHandlerTest.java            |  8 +--
 .../TransactionClientHttpRequestInterceptor.java   | 18 +++++--
 10 files changed, 108 insertions(+), 63 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
new file mode 100644
index 0000000..118b033
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.context;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompensationContext {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final Map<String, CompensationContextInternal> contexts = new HashMap<>();
+
+  public void addCompensationContext(Method compensationMethod, Object target) {
+    compensationMethod.setAccessible(true);
+    contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
+  }
+
+  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
+    CompensationContextInternal contextInternal = contexts.get(compensationMethod);
+
+    try {
+      contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
+      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      LOG.error(
+          "Pre-checking for compensate method " + contextInternal.compensationMethod.toString()
+              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+          e);
+    }
+  }
+
+  private static final class CompensationContextInternal {
+    private final Object target;
+    private final Method compensationMethod;
+
+    private CompensationContextInternal(Object target, Method compensationMethod) {
+      this.target = target;
+      this.compensationMethod = compensationMethod;
+    }
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 94de6ef..43bf0b4 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,17 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class OmegaContext {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
 
@@ -35,7 +25,6 @@ public class OmegaContext {
   private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
   private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
-  private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
 
   public OmegaContext(IdGenerator<String> idGenerator) {
     this.idGenerator = idGenerator;
@@ -83,25 +72,6 @@ public class OmegaContext {
     parentTxId.remove();
   }
 
-  public void addCompensationContext(Method compensationMethod, Object target) {
-    compensationMethod.setAccessible(true);
-    compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
-  }
-
-  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
-    CompensationContext compensationContext = compensationContexts.get(compensationMethod);
-
-    try {
-      compensationContext.compensationMethod.invoke(compensationContext.target, payloads);
-      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.error(
-          "Pre-checking for compensate method " + compensationContext.compensationMethod.toString()
-              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
-          e);
-    }
-  }
-
   @Override
   public String toString() {
     return "OmegaContext{" +
@@ -110,14 +80,4 @@ public class OmegaContext {
         ", parentTxId=" + parentTxId.get() +
         '}';
   }
-
-  private static final class CompensationContext {
-    private final Object target;
-    private final Method compensationMethod;
-
-    private CompensationContext(Object target, Method compensationMethod) {
-      this.target = target;
-      this.compensationMethod = compensationMethod;
-    }
-  }
 }
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index e5806ad..fa4027b 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.spring;
 
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -45,6 +46,11 @@ class OmegaSpringConfig {
   }
 
   @Bean
+  CompensationContext compensationContext() {
+    return new CompensationContext();
+  }
+
+  @Bean
   ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) {
     return new ServiceConfig(serviceName);
   }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 87f9049..338751c 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -25,9 +26,11 @@ import org.springframework.util.ReflectionUtils;
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
   private final OmegaContext omegaContext;
+  private final CompensationContext compensationContext;
 
-  CompensableAnnotationProcessor(OmegaContext omegaContext) {
+  CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
     this.omegaContext = omegaContext;
+    this.compensationContext = compensationContext;
   }
 
   @Override
@@ -43,7 +46,9 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
   }
 
   private void checkMethod(Object bean) {
-    ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
+    ReflectionUtils.doWithMethods(
+        bean.getClass(),
+        new CompensableMethodCheckingCallback(bean, compensationContext));
   }
 
   private void checkFields(Object bean) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index ac6615c..6c0c333 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.slf4j.Logger;
@@ -31,11 +31,11 @@ class CompensableMethodCheckingCallback implements MethodCallback {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
-  private final OmegaContext omegaContext;
+  private final CompensationContext compensationContext;
 
-  CompensableMethodCheckingCallback(Object bean, OmegaContext omegaContext) {
+  CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
     this.bean = bean;
-    this.omegaContext = omegaContext;
+    this.compensationContext = compensationContext;
   }
 
   @Override
@@ -48,7 +48,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
 
     try {
       Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
-      omegaContext.addCompensationContext(signature, bean);
+      compensationContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 1d46378..dc88bbd 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
@@ -31,7 +32,7 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, OmegaContext context) {
+  MessageHandler messageHandler(MessageSender sender, CompensationContext context) {
     return new CompensationMessageHandler(sender, context);
   }
 
@@ -41,7 +42,7 @@ public class TransactionAspectConfig {
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext) {
-    return new CompensableAnnotationProcessor(omegaContext);
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+    return new CompensableAnnotationProcessor(omegaContext, compensationContext);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 2e0836e..46c1e9b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -17,20 +17,20 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
-  private final OmegaContext omegaContext;
+  private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) {
+  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
     this.sender = sender;
-    this.omegaContext = omegaContext;
+    this.context = context;
   }
 
   @Override
   public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
-    omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads);
+    context.compensate(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index bf13e3a..d141d0c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -44,7 +44,7 @@ public class TransactionAspect {
     this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
     this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
     this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
-    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(this.context, sender);
+    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index 2ecd82f..2585f3f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -40,8 +40,8 @@ public class CompensationMessageHandlerTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final OmegaContext omegaContext = mock(OmegaContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext);
+  private final CompensationContext context = mock(CompensationContext.class);
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
 
   @Test
   public void sendsEventOnCompensationCompleted() throws Exception {
@@ -57,6 +57,6 @@ public class CompensationMessageHandlerTest {
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
 
-    verify(omegaContext).compensate(globalTxId, localTxId, compensationMethod, payload);
+    verify(context).compensate(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
index 5e69598..88ea564 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
@@ -18,16 +18,22 @@
 
 package org.apache.servicecomb.saga.omega.transport.resttemplate;
 
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.LOCAL_TX_ID_KEY;
+
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpRequest;
 import org.springframework.http.client.ClientHttpRequestExecution;
 import org.springframework.http.client.ClientHttpRequestInterceptor;
 import org.springframework.http.client.ClientHttpResponse;
 
 class TransactionClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
-
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OmegaContext omegaContext;
 
   TransactionClientHttpRequestInterceptor(OmegaContext omegaContext) {
@@ -39,8 +45,14 @@ class TransactionClientHttpRequestInterceptor implements ClientHttpRequestInterc
       ClientHttpRequestExecution execution) throws IOException {
 
     if (omegaContext.globalTxId() != null) {
-      request.getHeaders().add(OmegaContext.GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
-      request.getHeaders().add(OmegaContext.LOCAL_TX_ID_KEY, omegaContext.localTxId());
+      request.getHeaders().add(GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
+      request.getHeaders().add(LOCAL_TX_ID_KEY, omegaContext.localTxId());
+
+      LOG.debug("Added {} {} and {} {} to request header",
+          GLOBAL_TX_ID_KEY,
+          omegaContext.globalTxId(),
+          LOCAL_TX_ID_KEY,
+          omegaContext.localTxId());
     }
     return execution.execute(request, body);
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.