You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/13 14:57:51 UTC

[incubator-servicecomb-saga] branch master updated (2b62bdb -> 160bade)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from 2b62bdb  SCB-213 remove ended event from map
     new 968247f  SCB-212 delegated compensation context a dedicated class
     new 5b0be4b  SCB-212 made each @Compensable as a new sub-transaction
     new 137a1f4  SCB-212 removed parent tx id from omega context, since it's not necessary
     new 5212383  SCB-212 united interceptors into one
     new 0aaa3fd  SCB-212 tx timeout impl
     new 5751a11  SCB-212 replaced timeout impl with atomic to avoid locking
     new 51b1322  SCB-212 added saga start timeout
     new 0a18ed8  SCB-212 removed unnecessary lines
     new 5044410  SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
     new 602bff5  SCB-212 attempted to fix random test failure by resetting latency before test
     new 20a5886  SCB-212 better readability
     new e5e65b9  SCB-212 closed sender after each test to avoid test interference
     new 05113e5  SCB-212 fixed rebase conflict
     new 8c455ad  SCB-212 attempted to fix test failure
     new 160bade  SCB-212 replaced completable future with executor to avoid test failures in maven

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../saga/alpha/core/TxConsistentService.java       |  31 +++-
 .../saga/alpha/core/TxEventRepository.java         |   2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   |  18 +-
 .../saga/alpha/server/SpringTxEventRepository.java |   2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  19 ++-
 .../integration/pack/tests/GreetingController.java |  13 ++
 .../saga/integration/pack/tests/PackIT.java        | 184 +++++++++++++--------
 .../grpc/LoadBalancedClusterMessageSender.java     |   1 -
 .../grpc/LoadBalancedClusterMessageSenderTest.java |   4 +-
 .../saga/omega/context/CompensationContext.java    |  61 +++++++
 .../saga/omega/context/OmegaContext.java           |  51 ------
 .../saga/omega/context/annotations/SagaStart.java  |   1 +
 .../saga/omega/context/OmegaContextTest.java       |  18 --
 .../saga/omega/spring/OmegaSpringConfig.java       |   6 +
 .../spring/CompensableAnnotationProcessor.java     |   9 +-
 .../spring/CompensableMethodCheckingCallback.java  |  10 +-
 .../transaction/spring/ExecutorFieldCallback.java  |  13 +-
 .../spring/TransactionAspectConfig.java            |  16 +-
 .../spring/TransactionInterceptionTest.java        |  74 +++++----
 omega/omega-transaction/pom.xml                    |  20 ++-
 ...nterceptor.java => CompensableInterceptor.java} |  25 ++-
 .../transaction/CompensationMessageHandler.java    |  10 +-
 ...Interceptor.java => EventAwareInterceptor.java} |  26 ++-
 .../transaction/FailedTransactionInterceptor.java  |  30 ----
 ...erializer.java => OmegaTxTimeoutException.java} |   6 +-
 .../transaction/SagaStartAnnotationProcessor.java  |  20 ++-
 .../saga/omega/transaction/SagaStartAspect.java    |  93 +++++++++++
 .../omega/transaction/TimeAwareInterceptor.java    |  55 ++++++
 .../saga/omega/transaction/TransactionAspect.java  | 102 ++++--------
 .../omega/transaction/annotations/Compensable.java |   2 +
 ...orTest.java => CompensableInterceptorTest.java} |  53 +++++-
 .../CompensationMessageHandlerTest.java            |   8 +-
 .../PostTransactionInterceptorTest.java            |  53 ------
 .../SagaStartAnnotationProcessorTest.java          |  26 ++-
 .../omega/transaction/SagaStartAspectTest.java     | 153 +++++++++++++++++
 .../transaction/TimeAwareInterceptorTest.java      | 135 +++++++++++++++
 .../omega/transaction/TransactionAspectTest.java   | 158 ++++++++++++++++++
 .../src/test/resources/log4j2-test.xml             |   0
 .../TransactionClientHttpRequestInterceptor.java   |  18 +-
 .../TransactionHandlerInterceptor.java             |  16 +-
 .../TransactionHandlerInterceptorTest.java         |   8 +-
 41 files changed, 1121 insertions(+), 429 deletions(-)
 create mode 100644 omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
 rename omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{PostTransactionInterceptor.java => CompensableInterceptor.java} (50%)
 rename omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{PreTransactionInterceptor.java => EventAwareInterceptor.java} (55%)
 delete mode 100644 omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{MessageSerializer.java => OmegaTxTimeoutException.java} (86%)
 create mode 100644 omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
 create mode 100644 omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
 rename omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/{PreTransactionInterceptorTest.java => CompensableInterceptorTest.java} (50%)
 delete mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
 create mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
 create mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
 create mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
 copy {saga-spring => omega/omega-transaction}/src/test/resources/log4j2-test.xml (100%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 07/15: SCB-212 added saga start timeout

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 51b1322e8a7705e9b649fe8c1ba9b94d9e58580d
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:32:23 2018 +0800

    SCB-212 added saga start timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-transaction/pom.xml                    | 16 +++++-
 .../transaction/SagaStartAnnotationProcessor.java  | 15 ++++--
 .../saga/omega/transaction/SagaStartAspect.java    | 32 ++++++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 16 +++---
 .../SagaStartAnnotationProcessorTest.java          |  4 +-
 .../omega/transaction/SagaStartAspectTest.java     | 59 +++++++++++++++++-----
 .../src/test/resources/log4j2-test.xml             | 30 +++++++++++
 7 files changed, 141 insertions(+), 31 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 1829650..1aa77fb 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -63,7 +63,21 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 64f47ce..6e8556b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
-class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   private final OmegaContext omegaContext;
   private final MessageSender sender;
@@ -29,18 +29,27 @@ class SagaStartAnnotationProcessor {
     this.sender = sender;
   }
 
-  void preIntercept() {
+  @Override
+  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
     String globalTxId = globalTxId();
     // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
     sender.send(new SagaStartedEvent(globalTxId, globalTxId));
   }
 
-  void postIntercept() {
+  @Override
+  public void postIntercept(String parentTxId, String compensationMethod) {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new SagaEndedEvent(globalTxId, globalTxId));
     omegaContext.clear();
   }
 
+  @Override
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    String globalTxId = omegaContext.globalTxId();
+    sender.send(new TxAbortedEvent(globalTxId, globalTxId, null, compensationMethod, throwable));
+    omegaContext.clear();
+  }
+
   private String globalTxId() {
     String globalTxId = omegaContext.newGlobalTxId();
     omegaContext.setLocalTxId(globalTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 307af3c..3fa6322 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -17,10 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
@@ -33,6 +38,7 @@ public class SagaStartAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final OmegaContext context;
 
   public SagaStartAspect(MessageSender sender, OmegaContext context) {
@@ -40,13 +46,15 @@ public class SagaStartAspect {
     this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
-  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
-  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
+  Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
-    sagaStartAnnotationProcessor.preIntercept();
+    TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
+    interceptor.preIntercept(context.globalTxId(), method.toString());
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
 
+    scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
       return joinPoint.proceed();
     } catch (Throwable throwable) {
@@ -54,7 +62,23 @@ public class SagaStartAspect {
       throw throwable;
     } finally {
       LOG.debug("Transaction with context {} has finished.", context);
-      sagaStartAnnotationProcessor.postIntercept();
+      interceptor.postIntercept(context.globalTxId(), method.toString());
+    }
+  }
+
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              context.globalTxId(),
+              method.toString(),
+              new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
     }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 317f802..80ad03f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,27 +29,27 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String localTxId, String signature, Object... args) {
-    interceptor.preIntercept(localTxId, signature, args);
+  public void preIntercept(String parentTxId, String signature, Object... args) {
+    interceptor.preIntercept(parentTxId, signature, args);
   }
 
   @Override
-  public void postIntercept(String localTxId, String signature) {
+  public void postIntercept(String parentTxId, String signature) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.postIntercept(localTxId, signature);
+      interceptor.postIntercept(parentTxId, signature);
     }
   }
 
   @Override
-  public void onError(String localTxId, String signature, Throwable throwable) {
+  public void onError(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 
-  void onTimeout(String localTxId, String signature, Throwable throwable) {
+  void onTimeout(String parentTxId, String signature, Throwable throwable) {
     if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(localTxId, signature, throwable);
+      interceptor.onError(parentTxId, signature, throwable);
     }
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 0dadebe..8fa3568 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -53,7 +53,7 @@ public class SagaStartAnnotationProcessorTest {
   public void sendsSagaStartedEvent() {
     when(generator.nextId()).thenReturn(globalTxId, localTxId);
 
-    sagaStartAnnotationProcessor.preIntercept();
+    sagaStartAnnotationProcessor.preIntercept(null, null);
 
     assertThat(context.globalTxId(), is(globalTxId));
     assertThat(context.localTxId(), is(globalTxId));
@@ -73,7 +73,7 @@ public class SagaStartAnnotationProcessorTest {
     context.clear();
     context.setGlobalTxId(globalTxId);
 
-    sagaStartAnnotationProcessor.postIntercept();
+    sagaStartAnnotationProcessor.postIntercept(null, null);
 
     TxEvent event = messages.get(0);
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 432b3ad..c282ecc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -18,18 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
@@ -40,7 +45,6 @@ public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -48,29 +52,24 @@ public class SagaStartAspectTest {
 
   @SuppressWarnings("unchecked")
   private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
-  private final Compensable compensable = Mockito.mock(Compensable.class);
+  private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
 
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
-    when(compensable.compensationMethod()).thenReturn("doNothing");
-
-    omegaContext.setGlobalTxId(globalTxId);
-    omegaContext.setLocalTxId(localTxId);
+    omegaContext.clear();
   }
 
   @Test
   public void newGlobalTxIdInSagaStart() throws Throwable {
-    omegaContext.clear();
-    when(idGenerator.nextId()).thenReturn(globalTxId);
-
-    aspect.advise(joinPoint);
+    aspect.advise(joinPoint, sagaStart);
 
     TxEvent startedEvent = messages.get(0);
 
@@ -92,13 +91,12 @@ public class SagaStartAspectTest {
 
   @Test
   public void clearContextOnSagaStartError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(globalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
 
     try {
-      aspect.advise(joinPoint);
+      aspect.advise(joinPoint, sagaStart);
       expectFailing(RuntimeException.class);
     } catch (RuntimeException e) {
       assertThat(e, is(oops));
@@ -115,6 +113,41 @@ public class SagaStartAspectTest {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(sagaStart.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(globalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, sagaStart);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null);
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }
diff --git a/omega/omega-transaction/src/test/resources/log4j2-test.xml b/omega/omega-transaction/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-transaction/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/15: SCB-212 made each @Compensable as a new sub-transaction

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 5b0be4b28b0e26a56271bfc7b36c4d40b5c65985
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 15:39:20 2018 +0800

    SCB-212 made each @Compensable as a new sub-transaction
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../integration/pack/tests/GreetingController.java |  13 ++
 .../saga/integration/pack/tests/PackIT.java        | 184 +++++++++++++--------
 .../spring/TransactionAspectConfig.java            |   9 +
 .../spring/TransactionInterceptionTest.java        |  74 +++++----
 .../transaction/SagaStartAnnotationProcessor.java  |  12 +-
 .../saga/omega/transaction/SagaStartAspect.java    |  60 +++++++
 .../saga/omega/transaction/TransactionAspect.java  |  46 ++----
 .../SagaStartAnnotationProcessorTest.java          |  11 +-
 .../omega/transaction/SagaStartAspectTest.java     | 124 ++++++++++++++
 .../omega/transaction/TransactionAspectTest.java   | 124 ++++++++++++++
 .../TransactionHandlerInterceptor.java             |  16 +-
 .../TransactionHandlerInterceptorTest.java         |   8 +-
 12 files changed, 524 insertions(+), 157 deletions(-)

diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 3c7c095..2bdd587 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.integration.pack.tests;
 
 import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Controller;
@@ -64,4 +65,16 @@ public class GreetingController {
   ResponseEntity<String> rude(@RequestParam String name) {
     return ResponseEntity.ok(greetingService.beingRude(name));
   }
+
+  @SagaStart
+  @Compensable(compensationMethod = "goodNight")
+  @GetMapping("/goodMorning")
+  ResponseEntity<String> goodMorning(@RequestParam String name) {
+    String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
+    return ResponseEntity.ok("Good morning, " + bonjour);
+  }
+
+  ResponseEntity<String> goodNight(@RequestParam String name) {
+    return ResponseEntity.ok("Good night, " + name);
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 56092be..7ed1b88 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -48,7 +48,6 @@ import org.springframework.test.context.junit4.SpringRunner;
     properties = {"server.port=8080", "spring.application.name=greeting-service"})
 public class PackIT {
   private static final String serviceName = "greeting-service";
-  private final String globalTxId = UUID.randomUUID().toString();
 
   @Autowired
   private TestRestTemplate restTemplate;
@@ -84,47 +83,47 @@ public class PackIT {
 
     assertThat(envelopes.size(), is(6));
 
-    TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
-    assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
-    assertThat(sagaStartedEventEnvelope.localTxId(), is(notNullValue()));
-    assertThat(sagaStartedEventEnvelope.parentTxId(), is(nullValue()));
-    assertThat(sagaStartedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(sagaStartedEventEnvelope.instanceId(), is(notNullValue()));
-
-    TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
-    assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
-    assertThat(txStartedEventEnvelope1.localTxId(), is(notNullValue()));
-    assertThat(txStartedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txStartedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txStartedEventEnvelope1.instanceId(), is(sagaStartedEventEnvelope.instanceId()));
-
-    TxEventEnvelope txEndedEventEnvelope1 = envelopes.get(2);
-    assertThat(txEndedEventEnvelope1.type(), is("TxEndedEvent"));
-    assertThat(txEndedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txEndedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txEndedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txEndedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
-    TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
-    assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
-    assertThat(txStartedEventEnvelope2.localTxId(), is(notNullValue()));
-    assertThat(txStartedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txStartedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txStartedEventEnvelope2.instanceId(), is(notNullValue()));
-
-    TxEventEnvelope txEndedEventEnvelope2 = envelopes.get(4);
-    assertThat(txEndedEventEnvelope2.type(), is("TxEndedEvent"));
-    assertThat(txEndedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txEndedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txEndedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txEndedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
-    TxEventEnvelope sagaEndedEventEnvelope = envelopes.get(5);
-    assertThat(sagaEndedEventEnvelope.type(), is("SagaEndedEvent"));
-    assertThat(sagaEndedEventEnvelope.localTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(sagaEndedEventEnvelope.parentTxId(), is(nullValue()));
-    assertThat(sagaEndedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(sagaEndedEventEnvelope.instanceId(), is(notNullValue()));
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+    assertThat(sagaStartedEvent.localTxId(), is(globalTxId));
+    assertThat(sagaStartedEvent.parentTxId(), is(nullValue()));
+    assertThat(sagaStartedEvent.serviceName(), is(serviceName));
+    assertThat(sagaStartedEvent.instanceId(), is(notNullValue()));
+
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txStartedEvent1.serviceName(), is(serviceName));
+    assertThat(txStartedEvent1.instanceId(), is(sagaStartedEvent.instanceId()));
+
+    TxEventEnvelope txEndedEvent1 = envelopes.get(2);
+    assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txEndedEvent1.serviceName(), is(serviceName));
+    assertThat(txEndedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+    TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txStartedEvent2.serviceName(), is(serviceName));
+    assertThat(txStartedEvent2.instanceId(), is(notNullValue()));
+
+    TxEventEnvelope txEndedEvent2 = envelopes.get(4);
+    assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txEndedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txEndedEvent2.serviceName(), is(serviceName));
+    assertThat(txEndedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
+
+    TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+    assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+    assertThat(sagaEndedEvent.localTxId(), is(globalTxId));
+    assertThat(sagaEndedEvent.parentTxId(), is(nullValue()));
+    assertThat(sagaEndedEvent.serviceName(), is(serviceName));
+    assertThat(sagaEndedEvent.instanceId(), is(notNullValue()));
 
     assertThat(compensatedMessages.isEmpty(), is(true));
   }
@@ -146,36 +145,37 @@ public class PackIT {
     List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
     assertThat(envelopes.size(), is(8));
 
-    TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
-    assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
 
-    TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
-    assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
     assertThat(envelopes.get(2).type(), is("TxEndedEvent"));
 
-    TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
-    assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
-
-    TxEventEnvelope txAbortedEventEnvelope = envelopes.get(4);
-    assertThat(txAbortedEventEnvelope.type(), is("TxAbortedEvent"));
-    assertThat(txAbortedEventEnvelope.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txAbortedEventEnvelope.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txAbortedEventEnvelope.serviceName(), is(serviceName));
-    assertThat(txAbortedEventEnvelope.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
-    TxEventEnvelope txCompensatedEventEnvelope1 = envelopes.get(5);
-    assertThat(txCompensatedEventEnvelope1.type(), is("TxCompensatedEvent"));
-    assertThat(txCompensatedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txCompensatedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
-    assertThat(txCompensatedEventEnvelope1.serviceName(), is(serviceName));
-    assertThat(txCompensatedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
-    TxEventEnvelope txCompensatedEventEnvelope2 = envelopes.get(6);
-    assertThat(txCompensatedEventEnvelope2.type(), is("TxCompensatedEvent"));
-    assertThat(txCompensatedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
-    assertThat(txCompensatedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
-    assertThat(txCompensatedEventEnvelope2.serviceName(), is(serviceName));
-    assertThat(txCompensatedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
+    TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+
+    TxEventEnvelope txAbortedEvent = envelopes.get(4);
+    assertThat(txAbortedEvent.type(), is("TxAbortedEvent"));
+    assertThat(txAbortedEvent.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txAbortedEvent.parentTxId(), is(globalTxId));
+    assertThat(txAbortedEvent.serviceName(), is(serviceName));
+    assertThat(txAbortedEvent.instanceId(), is(txStartedEvent2.instanceId()));
+
+    // TODO: 2018/1/9 compensation shall be done in reverse order
+    TxEventEnvelope txCompensatedEvent1 = envelopes.get(5);
+    assertThat(txCompensatedEvent1.type(), is("TxCompensatedEvent"));
+    assertThat(txCompensatedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txCompensatedEvent1.parentTxId(), is(globalTxId));
+    assertThat(txCompensatedEvent1.serviceName(), is(serviceName));
+    assertThat(txCompensatedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+    TxEventEnvelope txCompensatedEvent2 = envelopes.get(6);
+    assertThat(txCompensatedEvent2.type(), is("TxCompensatedEvent"));
+    assertThat(txCompensatedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txCompensatedEvent2.parentTxId(), is(globalTxId));
+    assertThat(txCompensatedEvent2.serviceName(), is(serviceName));
+    assertThat(txCompensatedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
 
     assertThat(envelopes.get(7).type(), is("SagaEndedEvent"));
 
@@ -183,4 +183,50 @@ public class PackIT {
         "Goodbye, " + TRESPASSER,
         "My bad, please take the window instead, " + TRESPASSER));
   }
+
+  @Test(timeout = 5000)
+  public void updatesEmbeddedTxStateToAlpha() throws Exception {
+    ResponseEntity<String> entity = restTemplate.getForEntity("/goodMorning?name={name}",
+        String.class,
+        "mike");
+
+    assertThat(entity.getStatusCode(), is(OK));
+    assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
+
+    List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+    assertThat(distinctGlobalTxIds.size(), is(1));
+
+    String globalTxId = distinctGlobalTxIds.get(0);
+    List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+
+    assertThat(envelopes.size(), is(6));
+
+    TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+    assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+
+    TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+    assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+
+    TxEventEnvelope txStartedEvent2 = envelopes.get(2);
+    assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+    assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+    assertThat(txStartedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+    TxEventEnvelope txEndedEvent2 = envelopes.get(3);
+    assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+    assertThat(txEndedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+    TxEventEnvelope txEndedEvent1 = envelopes.get(4);
+    assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+    assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+    assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+
+    TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+    assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
 }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index dc88bbd..5358db5 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -22,10 +22,12 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartAspect;
 import org.apache.servicecomb.saga.omega.transaction.TransactionAspect;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
+import org.springframework.core.annotation.Order;
 
 @Configuration
 @EnableAspectJAutoProxy
@@ -36,6 +38,13 @@ public class TransactionAspectConfig {
     return new CompensationMessageHandler(sender, context);
   }
 
+  @Order(0)
+  @Bean
+  SagaStartAspect sagaStartAspect(MessageSender sender, OmegaContext context) {
+    return new SagaStartAspect(sender, context);
+  }
+
+  @Order(1)
   @Bean
   TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
     return new TransactionAspect(sender, context);
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 3dc8c73..c30953f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,8 +36,9 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
@@ -50,6 +52,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -68,9 +71,11 @@ import akka.actor.Props;
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
+  @SuppressWarnings("unchecked")
+  private static final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
   private static final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
+  private final String newLocalTxId = UUID.randomUUID().toString();
+  private final String anotherLocalTxId = UUID.randomUUID().toString();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
@@ -102,9 +107,9 @@ public class TransactionInterceptionTest {
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
     omegaContext.setGlobalTxId(globalTxId);
-    omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
+    omegaContext.setLocalTxId(globalTxId);
     compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
@@ -125,8 +130,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
 
@@ -146,8 +151,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
-            new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(),
+            new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
   }
@@ -160,20 +165,20 @@ public class TransactionInterceptionTest {
     String localTxId = omegaContext.newLocalTxId();
     User anotherUser = userService.add(jack);
 
-    messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
-    messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
+    messageHandler.onReceive(globalTxId, newLocalTxId, globalTxId, compensationMethod, user);
+    messageHandler.onReceive(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser);
 
     assertThat(userRepository.findOne(user.id()), is(nullValue()));
     assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
-            new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
         },
         toArray(messages)
     );
@@ -184,16 +189,16 @@ public class TransactionInterceptionTest {
     new Thread(() -> userService.add(user)).start();
     waitTillSavedUser(username);
 
-    String newLocalTxId = omegaContext.newLocalTxId();
+    String localTxId = omegaContext.newLocalTxId();
     new Thread(() -> userService.add(jack)).start();
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
-            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -203,16 +208,16 @@ public class TransactionInterceptionTest {
     executor.schedule(() -> userService.add(user), 0, MILLISECONDS);
     waitTillSavedUser(username);
 
-    String newLocalTxId = omegaContext.newLocalTxId();
+    String localTxId = omegaContext.newLocalTxId();
     executor.invokeAll(singletonList(() -> userService.add(jack)));
     waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
-            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
-            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -231,8 +236,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
@@ -249,8 +254,8 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[] {
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
         toArray(messages)
     );
 
@@ -289,8 +294,13 @@ public class TransactionInterceptionTest {
     private final List<String> messages = new ArrayList<>();
 
     @Bean
+    CompensationContext compensationContext() {
+      return new CompensationContext();
+    }
+
+    @Bean
     OmegaContext omegaContext() {
-      return new OmegaContext(new UniqueIdGenerator());
+      return new OmegaContext(idGenerator);
     }
 
     @Bean
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index e92ebb1..64f47ce 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,10 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
-public class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor {
 
   private final OmegaContext omegaContext;
-
   private final MessageSender sender;
 
   SagaStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
@@ -31,7 +30,7 @@ public class SagaStartAnnotationProcessor {
   }
 
   void preIntercept() {
-    String globalTxId = omegaContext.newGlobalTxId();
+    String globalTxId = globalTxId();
     // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
     sender.send(new SagaStartedEvent(globalTxId, globalTxId));
   }
@@ -39,5 +38,12 @@ public class SagaStartAnnotationProcessor {
   void postIntercept() {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new SagaEndedEvent(globalTxId, globalTxId));
+    omegaContext.clear();
+  }
+
+  private String globalTxId() {
+    String globalTxId = omegaContext.newGlobalTxId();
+    omegaContext.setLocalTxId(globalTxId);
+    return globalTxId;
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
new file mode 100644
index 0000000..307af3c
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class SagaStartAspect {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+  private final OmegaContext context;
+
+  public SagaStartAspect(MessageSender sender, OmegaContext context) {
+    this.context = context;
+    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+  }
+
+  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
+  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+    sagaStartAnnotationProcessor.preIntercept();
+    LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+
+    try {
+      return joinPoint.proceed();
+    } catch (Throwable throwable) {
+      LOG.error("Failed to process SagaStart method: {}", method.toString());
+      throw throwable;
+    } finally {
+      LOG.debug("Transaction with context {} has finished.", context);
+      sagaStartAnnotationProcessor.postIntercept();
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a39c0d..f62b92e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -36,7 +36,6 @@ public class TransactionAspect {
   private final PostTransactionInterceptor postTransactionInterceptor;
   private final FailedTransactionInterceptor failedTransactionInterceptor;
 
-  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
   private final OmegaContext context;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -44,7 +43,6 @@ public class TransactionAspect {
     this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
     this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
     this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
-    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -54,7 +52,12 @@ public class TransactionAspect {
 
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
+    String localTxId = context.localTxId();
+    String parentTxId = context.parentTxId();
+    context.setParentTxId(localTxId);
+
     preIntercept(joinPoint, signature);
+    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
@@ -64,26 +67,10 @@ public class TransactionAspect {
     } catch (Throwable throwable) {
       interceptException(signature, throwable);
       throw throwable;
-    }
-  }
-
-  @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
-  Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
-    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
-    LOG.debug("Initializing global tx id before execution of method {}", method.toString());
-    sagaStartAnnotationProcessor.preIntercept();
-
-    try {
-      Object result = joinPoint.proceed();
-
-      LOG.info("Transaction {} succeeded.", context.globalTxId());
-      sagaStartAnnotationProcessor.postIntercept();
-
-      return result;
-    } catch (Throwable throwable) {
-      LOG.error("Transaction {} failed.", context.globalTxId());
-      throw throwable;
+    } finally {
+      context.setLocalTxId(localTxId);
+      context.setParentTxId(parentTxId);
+      LOG.debug("Restored context back to {}", context);
     }
   }
 
@@ -97,12 +84,9 @@ public class TransactionAspect {
   }
 
   private void preIntercept(ProceedingJoinPoint joinPoint, String signature) {
-    // context without a parent should be the first TxStartedEvent
-    initFirstOmegaContext();
-
     preTransactionInterceptor.intercept(
         context.globalTxId(),
-        context.localTxId(),
+        context.newLocalTxId(),
         context.parentTxId(),
         signature,
         joinPoint.getArgs());
@@ -124,14 +108,4 @@ public class TransactionAspect {
         signature,
         throwable);
   }
-
-  private void initFirstOmegaContext() {
-    if (context.parentTxId() != null) {
-      return;
-    }
-    if (context.localTxId() == null) {
-      context.newLocalTxId();
-    }
-    context.setParentTxId(context.globalTxId());
-  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 8913b7f..fba7826 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -41,9 +41,9 @@ public class SagaStartAnnotationProcessorTest {
 
   private final String localTxId = UUID.randomUUID().toString();
 
-  private final IdGenerator generator = mock(IdGenerator.class);
-
   @SuppressWarnings("unchecked")
+  private final IdGenerator<String> generator = mock(IdGenerator.class);
+
   private final OmegaContext context = new OmegaContext(generator);
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
@@ -56,7 +56,7 @@ public class SagaStartAnnotationProcessorTest {
     sagaStartAnnotationProcessor.preIntercept();
 
     assertThat(context.globalTxId(), is(globalTxId));
-    assertThat(context.localTxId(), is(nullValue()));
+    assertThat(context.localTxId(), is(globalTxId));
     assertThat(context.parentTxId(), is(nullValue()));
 
     TxEvent event = messages.get(0);
@@ -73,7 +73,6 @@ public class SagaStartAnnotationProcessorTest {
   public void sendsSagaEndedEvent() {
     context.clear();
     context.setGlobalTxId(globalTxId);
-    context.setLocalTxId(localTxId);
 
     sagaStartAnnotationProcessor.postIntercept();
 
@@ -85,5 +84,9 @@ public class SagaStartAnnotationProcessorTest {
     assertThat(event.compensationMethod().isEmpty(), is(true));
     assertThat(event.type(), is("SagaEndedEvent"));
     assertThat(event.payloads().length, is(0));
+
+    assertThat(context.globalTxId(), is(nullValue()));
+    assertThat(context.localTxId(), is(nullValue()));
+    assertThat(context.parentTxId(), is(nullValue()));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
new file mode 100644
index 0000000..cfaa7b6
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SagaStartAspectTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final MessageSender sender = messages::add;
+  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final Compensable compensable = Mockito.mock(Compensable.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+  private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
+
+  @Before
+  public void setUp() throws Exception {
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+    omegaContext.setParentTxId(parentTxId);
+  }
+
+  @Test
+  public void newGlobalTxIdInSagaStart() throws Throwable {
+    omegaContext.clear();
+    when(idGenerator.nextId()).thenReturn(globalTxId);
+
+    aspect.advise(joinPoint);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(globalTxId));
+    assertThat(startedEvent.parentTxId(), is(nullValue()));
+    assertThat(startedEvent.type(), is("SagaStartedEvent"));
+
+    TxEvent endedEvent = messages.get(1);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(globalTxId));
+    assertThat(endedEvent.parentTxId(), is(nullValue()));
+    assertThat(endedEvent.type(), is("SagaEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(nullValue()));
+    assertThat(omegaContext.localTxId(), is(nullValue()));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
+  }
+
+  @Test
+  public void clearContextOnSagaStartError() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(globalTxId);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is("SagaEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(nullValue()));
+    assertThat(omegaContext.localTxId(), is(nullValue()));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
new file mode 100644
index 0000000..bd8829c
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TransactionAspectTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final String newLocalTxId = UUID.randomUUID().toString();
+
+  private final MessageSender sender = messages::add;
+  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final Compensable compensable = Mockito.mock(Compensable.class);
+
+  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+  private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+
+  @Before
+  public void setUp() throws Exception {
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(joinPoint.getTarget()).thenReturn(this);
+
+    when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+    when(compensable.compensationMethod()).thenReturn("doNothing");
+
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+    omegaContext.setParentTxId(parentTxId);
+  }
+
+  @Test
+  public void newLocalTxIdInCompensable() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+
+    aspect.advise(joinPoint, compensable);
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent.parentTxId(), is(localTxId));
+    assertThat(startedEvent.type(), is("TxStartedEvent"));
+
+    TxEvent endedEvent = messages.get(1);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(newLocalTxId));
+    assertThat(endedEvent.parentTxId(), is(localTxId));
+    assertThat(endedEvent.type(), is("TxEndedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(parentTxId));
+  }
+
+  @Test
+  public void restoreContextOnCompensableError() throws Throwable {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint, compensable);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(newLocalTxId));
+    assertThat(event.parentTxId(), is(localTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    assertThat(omegaContext.globalTxId(), is(globalTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(parentTxId));
+  }
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index 3e5d620..9b003f8 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -20,6 +20,9 @@
 
 package org.apache.servicecomb.saga.omega.transport.resttemplate;
 
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.LOCAL_TX_ID_KEY;
+
 import java.lang.invoke.MethodHandles;
 
 import javax.servlet.http.HttpServletRequest;
@@ -43,24 +46,21 @@ class TransactionHandlerInterceptor implements HandlerInterceptor {
 
   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
-    String globalTxId = request.getHeader(OmegaContext.GLOBAL_TX_ID_KEY);
+    String globalTxId = request.getHeader(GLOBAL_TX_ID_KEY);
     if (globalTxId == null) {
-      LOG.debug("no such header: {}", OmegaContext.GLOBAL_TX_ID_KEY);
+      LOG.debug("no such header: {}", GLOBAL_TX_ID_KEY);
     } else {
       omegaContext.setGlobalTxId(globalTxId);
-      omegaContext.newLocalTxId();
-      omegaContext.setParentTxId(request.getHeader(OmegaContext.LOCAL_TX_ID_KEY));
+      omegaContext.setLocalTxId(request.getHeader(LOCAL_TX_ID_KEY));
     }
     return true;
   }
 
   @Override
-  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o,
-      ModelAndView modelAndView) {
+  public void postHandle(HttpServletRequest request, HttpServletResponse response, Object o, ModelAndView mv) {
   }
 
   @Override
-  public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o,
-      Exception e) {
+  public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o, Exception e) {
   }
 }
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
index 6ee816b..03f278f 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
@@ -51,9 +51,7 @@ public class TransactionHandlerInterceptorTest {
 
   @Before
   public void setUp() {
-    omegaContext.setGlobalTxId(null);
-    omegaContext.setLocalTxId(null);
-    omegaContext.setParentTxId(null);
+    omegaContext.clear();
   }
 
   @Test
@@ -64,8 +62,8 @@ public class TransactionHandlerInterceptorTest {
     requestInterceptor.preHandle(request, response, null);
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
-    assertThat(omegaContext.localTxId(), is(notNullValue()));
-    assertThat(omegaContext.parentTxId(), is(localTxId));
+    assertThat(omegaContext.localTxId(), is(localTxId));
+    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/15: SCB-212 removed parent tx id from omega context, since it's not necessary

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 137a1f4e5a18726f70c8f421c863c3b210faf2fa
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 15:56:53 2018 +0800

    SCB-212 removed parent tx id from omega context, since it's not necessary
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java            | 11 -----------
 .../saga/omega/context/OmegaContextTest.java        | 18 ------------------
 .../transaction/spring/ExecutorFieldCallback.java   | 13 ++++---------
 .../saga/omega/transaction/TransactionAspect.java   | 21 +++++++++------------
 .../SagaStartAnnotationProcessorTest.java           |  2 --
 .../saga/omega/transaction/SagaStartAspectTest.java |  3 ---
 .../omega/transaction/TransactionAspectTest.java    |  3 ---
 .../TransactionHandlerInterceptorTest.java          |  2 --
 8 files changed, 13 insertions(+), 60 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 43bf0b4..daa8e7c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -23,7 +23,6 @@ public class OmegaContext {
 
   private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
   private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
-  private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
 
   public OmegaContext(IdGenerator<String> idGenerator) {
@@ -58,18 +57,9 @@ public class OmegaContext {
     return localTxId.get();
   }
 
-  public String parentTxId() {
-    return parentTxId.get();
-  }
-
-  public void setParentTxId(String parentTxId) {
-    this.parentTxId.set(parentTxId);
-  }
-
   public void clear() {
     globalTxId.remove();
     localTxId.remove();
-    parentTxId.remove();
   }
 
   @Override
@@ -77,7 +67,6 @@ public class OmegaContext {
     return "OmegaContext{" +
         "globalTxId=" + globalTxId.get() +
         ", localTxId=" + localTxId.get() +
-        ", parentTxId=" + parentTxId.get() +
         '}';
   }
 }
diff --git a/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java b/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
index 3f844d3..b741e88 100644
--- a/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
+++ b/omega/omega-context/src/test/java/org/apache/servicecomb/saga/omega/context/OmegaContextTest.java
@@ -67,24 +67,6 @@ public class OmegaContextTest {
     CompletableFuture.allOf(future1, future2).join();
   }
 
-  @Test
-  public void eachThreadGetsDifferentParentTxId() throws Exception {
-    CyclicBarrier barrier = new CyclicBarrier(2);
-
-    Runnable runnable = exceptionalRunnable(() -> {
-      String parentId = UUID.randomUUID().toString();
-      omegaContext.setParentTxId(parentId);
-      barrier.await();
-
-      assertThat(omegaContext.parentTxId(), is(parentId));
-    });
-
-    CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
-    CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
-
-    CompletableFuture.allOf(future1, future2).join();
-  }
-
   private Runnable exceptionalRunnable(ExceptionalRunnable runnable) {
     return () -> {
       try {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
index 5ae949d..e07356c 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -68,7 +68,6 @@ class ExecutorFieldCallback implements FieldCallback {
 
     private final String globalTxId;
     private final String localTxId;
-    private final String parentTxId;
     private final Object runnable;
     private final OmegaContext omegaContext;
 
@@ -84,29 +83,25 @@ class ExecutorFieldCallback implements FieldCallback {
       this.omegaContext = omegaContext;
       this.globalTxId = omegaContext.globalTxId();
       this.localTxId = omegaContext.localTxId();
-      this.parentTxId = omegaContext.parentTxId();
       this.runnable = runnable;
     }
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
       try {
-        LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+        LOG.debug("Setting OmegaContext with globalTxId [{}] & localTxId [{}]",
             globalTxId,
-            localTxId,
-            parentTxId);
+            localTxId);
 
         omegaContext.setGlobalTxId(globalTxId);
         omegaContext.setLocalTxId(localTxId);
-        omegaContext.setParentTxId(parentTxId);
 
         return method.invoke(runnable, args);
       } finally {
         omegaContext.clear();
-        LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+        LOG.debug("Cleared OmegaContext with globalTxId [{}] & localTxId [{}]",
             globalTxId,
-            localTxId,
-            parentTxId);
+            localTxId);
       }
     }
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index f62b92e..b5c6859 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -53,23 +53,20 @@ public class TransactionAspect {
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
-    String parentTxId = context.parentTxId();
-    context.setParentTxId(localTxId);
 
-    preIntercept(joinPoint, signature);
+    preIntercept(joinPoint, signature, localTxId);
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     try {
       Object result = joinPoint.proceed();
-      postIntercept(signature);
+      postIntercept(signature, localTxId);
 
       return result;
     } catch (Throwable throwable) {
-      interceptException(signature, throwable);
+      interceptException(signature, throwable, localTxId);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
-      context.setParentTxId(parentTxId);
       LOG.debug("Restored context back to {}", context);
     }
   }
@@ -83,28 +80,28 @@ public class TransactionAspect {
         .toString();
   }
 
-  private void preIntercept(ProceedingJoinPoint joinPoint, String signature) {
+  private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
     preTransactionInterceptor.intercept(
         context.globalTxId(),
         context.newLocalTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature,
         joinPoint.getArgs());
   }
 
-  private void postIntercept(String signature) {
+  private void postIntercept(String signature, String parentTxId) {
     postTransactionInterceptor.intercept(
         context.globalTxId(),
         context.localTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature);
   }
 
-  private void interceptException(String signature, Throwable throwable) {
+  private void interceptException(String signature, Throwable throwable, String parentTxId) {
     failedTransactionInterceptor.intercept(
         context.globalTxId(),
         context.localTxId(),
-        context.parentTxId(),
+        parentTxId,
         signature,
         throwable);
   }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index fba7826..0dadebe 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -57,7 +57,6 @@ public class SagaStartAnnotationProcessorTest {
 
     assertThat(context.globalTxId(), is(globalTxId));
     assertThat(context.localTxId(), is(globalTxId));
-    assertThat(context.parentTxId(), is(nullValue()));
 
     TxEvent event = messages.get(0);
 
@@ -87,6 +86,5 @@ public class SagaStartAnnotationProcessorTest {
 
     assertThat(context.globalTxId(), is(nullValue()));
     assertThat(context.localTxId(), is(nullValue()));
-    assertThat(context.parentTxId(), is(nullValue()));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index cfaa7b6..432b3ad 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -63,7 +63,6 @@ public class SagaStartAspectTest {
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
   }
 
   @Test
@@ -89,7 +88,6 @@ public class SagaStartAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test
@@ -115,7 +113,6 @@ public class SagaStartAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   private String doNothing() {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index bd8829c..65f23a7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -64,7 +64,6 @@ public class TransactionAspectTest {
 
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
-    omegaContext.setParentTxId(parentTxId);
   }
 
   @Test
@@ -89,7 +88,6 @@ public class TransactionAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(parentTxId));
   }
 
   @Test
@@ -115,7 +113,6 @@ public class TransactionAspectTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(parentTxId));
   }
 
   private String doNothing() {
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
index 03f278f..7b41540 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
@@ -63,7 +63,6 @@ public class TransactionHandlerInterceptorTest {
 
     assertThat(omegaContext.globalTxId(), is(globalTxId));
     assertThat(omegaContext.localTxId(), is(localTxId));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
   }
 
   @Test
@@ -74,7 +73,6 @@ public class TransactionHandlerInterceptorTest {
     requestInterceptor.preHandle(request, response, null);
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
-    assertThat(omegaContext.parentTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 12/15: SCB-212 closed sender after each test to avoid test interference

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e5e65b97052730cc407fee9322835e920b247de2
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 14:51:09 2018 +0800

    SCB-212 closed sender after each test to avoid test interference
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java    | 3 ---
 .../omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java     | 3 ++-
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index d441040..6eae8a3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -99,7 +99,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     senders.keySet().forEach(sender -> {
       try {
         sender.onConnected();
-        senders.put(sender, 0L);
       } catch (Exception e) {
         log.error("Failed connecting to alpha at {}", sender.target(), e);
       }
@@ -111,12 +110,10 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     senders.keySet().forEach(sender -> {
       try {
         sender.onDisconnected();
-        senders.put(sender, Long.MAX_VALUE);
       } catch (Exception e) {
         log.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
     });
-
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 24326e3..f280de0 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -133,6 +133,8 @@ public class LoadBalancedClusterMessageSenderTest {
 
   @After
   public void after() throws Exception {
+    messageSender.onDisconnected();
+    messageSender.close();
     eventsMap.values().forEach(Queue::clear);
     connected.values().forEach(Queue::clear);
   }
@@ -242,7 +244,6 @@ public class LoadBalancedClusterMessageSenderTest {
 
   @Test
   public void considerFasterServerFirst() throws Exception {
-    messageSender.onConnected();
     // we don't know which server is selected at first
     messageSender.send(event);
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/15: SCB-212 tx timeout impl

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0aaa3fde7da0b56ab109140576179cff1a70d9ea
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 21:07:30 2018 +0800

    SCB-212 tx timeout impl
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-transaction/pom.xml                    |  4 ++
 .../omega/transaction/CompensableInterceptor.java  | 19 +++++---
 .../omega/transaction/EventAwareInterceptor.java   | 20 ++++++--
 .../omega/transaction/TimeAwareInterceptor.java    | 54 ++++++++++++++++++++++
 .../saga/omega/transaction/TransactionAspect.java  | 46 ++++++------------
 .../transaction/CompensableInterceptorTest.java    | 21 +++++++--
 .../transaction/TimeAwareInterceptorTest.java}     |  8 ++--
 .../omega/transaction/TransactionAspectTest.java   | 44 ++++++++++++++++--
 8 files changed, 163 insertions(+), 53 deletions(-)

diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index 90c00e9..1829650 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -59,6 +59,10 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index b443c4d..76193cd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -17,26 +17,31 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+
 class CompensableInterceptor implements EventAwareInterceptor {
+  private final OmegaContext context;
   private final MessageSender sender;
 
-  CompensableInterceptor(MessageSender sender) {
+  CompensableInterceptor(OmegaContext context, MessageSender sender) {
+    this.context = context;
     this.sender = sender;
   }
 
   @Override
-  public void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
+  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
   }
 
   @Override
-  public void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+  public void postIntercept(String parentTxId, String compensationMethod) {
+    sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod));
 
   }
 
   @Override
-  public void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
+  public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    sender.send(
+        new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 9be92e6..291538f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -18,9 +18,23 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
+  EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    }
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+    }
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
+    @Override
+    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+    }
+  };
+
+  void preIntercept(String parentTxId, String compensationMethod, Object... message);
+
+  void postIntercept(String parentTxId, String compensationMethod);
+
+  void onError(String parentTxId, String compensationMethod, Throwable throwable);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..d633630
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeoutException;
+
+class TimeAwareInterceptor implements EventAwareInterceptor {
+  private final EventAwareInterceptor interceptor;
+  private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+
+  TimeAwareInterceptor(EventAwareInterceptor interceptor) {
+    this.interceptor = interceptor;
+    this.interceptors.offer(interceptor);
+  }
+
+  @Override
+  public void preIntercept(String localTxId, String signature, Object... args) {
+    interceptor.preIntercept(localTxId, signature, args);
+  }
+
+  @Override
+  public void postIntercept(String localTxId, String signature) {
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().postIntercept(localTxId, signature);
+  }
+
+  @Override
+  public void onError(String localTxId, String signature, Throwable throwable) {
+    interceptors.offerLast(NO_OP_INTERCEPTOR);
+    interceptors.pollFirst().onError(localTxId, signature, throwable);
+  }
+
+  void onTimeout(String signature, String localTxId) {
+    interceptors.offerFirst(NO_OP_INTERCEPTOR);
+    interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index a447489..9bee829 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -17,8 +17,12 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
@@ -34,11 +38,12 @@ public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext context;
-  private final EventAwareInterceptor interceptor;
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+  private final CompensableInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
     this.context = context;
-    this.interceptor = new CompensableInterceptor(sender);
+    this.interceptor = new CompensableInterceptor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -49,17 +54,22 @@ public class TransactionAspect {
     String signature = compensationMethodSignature(joinPoint, compensable, method);
 
     String localTxId = context.localTxId();
+    context.newLocalTxId();
 
-    preIntercept(joinPoint, signature, localTxId);
+    TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
+    interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+    if (compensable.timeout() > 0) {
+      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
+    }
 
     try {
       Object result = joinPoint.proceed();
-      postIntercept(signature, localTxId);
+      interceptor.postIntercept(localTxId, signature);
 
       return result;
     } catch (Throwable throwable) {
-      interceptException(signature, throwable, localTxId);
+      interceptor.onError(localTxId, signature, throwable);
       throw throwable;
     } finally {
       context.setLocalTxId(localTxId);
@@ -75,30 +85,4 @@ public class TransactionAspect {
         .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
         .toString();
   }
-
-  private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
-    interceptor.preIntercept(
-        context.globalTxId(),
-        context.newLocalTxId(),
-        parentTxId,
-        signature,
-        joinPoint.getArgs());
-  }
-
-  private void postIntercept(String signature, String parentTxId) {
-    interceptor.postIntercept(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature);
-  }
-
-  private void interceptException(String signature, Throwable throwable, String parentTxId) {
-    interceptor.onError(
-        context.globalTxId(),
-        context.localTxId(),
-        parentTxId,
-        signature,
-        throwable);
-  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 7505a1f..609ea6d 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -27,7 +27,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class CompensableInterceptorTest {
 
@@ -41,11 +45,20 @@ public class CompensableInterceptorTest {
   private final String message = uniquify("message");
   private final String compensationMethod = getClass().getCanonicalName();
 
-  private final CompensableInterceptor interceptor = new CompensableInterceptor(sender);
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final OmegaContext context = new OmegaContext(idGenerator);
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(context, sender);
+
+  @Before
+  public void setUp() throws Exception {
+    context.setGlobalTxId(globalTxId);
+    context.setLocalTxId(localTxId);
+  }
 
   @Test
   public void sendsTxStartedEventBefore() throws Exception {
-    interceptor.preIntercept(globalTxId, localTxId, parentTxId, compensationMethod, message);
+    interceptor.preIntercept(parentTxId, compensationMethod, message);
 
     TxEvent event = messages.get(0);
 
@@ -59,7 +72,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxEndedEventAfter() throws Exception {
-    interceptor.postIntercept(globalTxId, localTxId, parentTxId, compensationMethod);
+    interceptor.postIntercept(parentTxId, compensationMethod);
 
     TxEvent event = messages.get(0);
 
@@ -73,7 +86,7 @@ public class CompensableInterceptorTest {
 
   @Test
   public void sendsTxAbortedEventOnError() throws Exception {
-    interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+    interceptor.onError(parentTxId, compensationMethod, new RuntimeException("oops"));
 
     TxEvent event = messages.get(0);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
similarity index 68%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 9be92e6..fb2ee1d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-public interface EventAwareInterceptor {
-  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
+import static org.junit.Assert.*;
 
-  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
+public class TimeAwareInterceptorTest {
 
-  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 76a0e34..2ce34b8 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -18,13 +18,18 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -55,6 +60,7 @@ public class TransactionAspectTest {
 
   @Before
   public void setUp() throws Exception {
+    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
     when(joinPoint.getTarget()).thenReturn(this);
 
@@ -67,8 +73,6 @@ public class TransactionAspectTest {
 
   @Test
   public void newLocalTxIdInCompensable() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
-
     aspect.advise(joinPoint, compensable);
 
     TxEvent startedEvent = messages.get(0);
@@ -91,7 +95,6 @@ public class TransactionAspectTest {
 
   @Test
   public void restoreContextOnCompensableError() throws Throwable {
-    when(idGenerator.nextId()).thenReturn(newLocalTxId);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
@@ -114,6 +117,41 @@ public class TransactionAspectTest {
     assertThat(omegaContext.localTxId(), is(localTxId));
   }
 
+  @Test
+  public void sendsAbortEventOnTimeout() throws Throwable {
+    CountDownLatch latch = new CountDownLatch(1);
+    when(compensable.timeout()).thenReturn(100);
+    when(joinPoint.proceed()).thenAnswer(invocationOnMock -> {
+      latch.await();
+      assertThat(omegaContext.localTxId(), is(newLocalTxId));
+      return null;
+    });
+
+    CompletableFuture.runAsync(() -> {
+      try {
+        aspect.advise(joinPoint, compensable);
+      } catch (Throwable throwable) {
+        fail(throwable.getMessage());
+      }
+    });
+
+    await().atMost(1, SECONDS).until(() -> messages.size() == 2);
+
+    TxEvent event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(newLocalTxId));
+    assertThat(event.parentTxId(), is(localTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+
+    latch.countDown();
+
+    await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId()));
+
+    // no redundant ended message received
+    assertThat(messages.size(), is(2));
+  }
+
   private String doNothing() {
     return "doNothing";
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 10/15: SCB-212 attempted to fix random test failure by resetting latency before test

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 602bff5ee7de9aa055cf8061186f6b9993d97e25
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 14:14:59 2018 +0800

    SCB-212 attempted to fix random test failure by resetting latency before test
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java    | 2 ++
 .../omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java     | 3 ++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 3cb6677..d441040 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -99,6 +99,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     senders.keySet().forEach(sender -> {
       try {
         sender.onConnected();
+        senders.put(sender, 0L);
       } catch (Exception e) {
         log.error("Failed connecting to alpha at {}", sender.target(), e);
       }
@@ -110,6 +111,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     senders.keySet().forEach(sender -> {
       try {
         sender.onDisconnected();
+        senders.put(sender, Long.MAX_VALUE);
       } catch (Exception e) {
         log.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index f995002..24326e3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -242,6 +242,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
   @Test
   public void considerFasterServerFirst() throws Exception {
+    messageSender.onConnected();
     // we don't know which server is selected at first
     messageSender.send(event);
 
@@ -250,7 +251,7 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.send(event);
     messageSender.send(event);
 
-    await().atMost(1, SECONDS).until(() -> eventsMap.get(8080).size() == 3);
+    assertThat(eventsMap.get(8080).size(), is(3));
     assertThat(eventsMap.get(8090).size(), is(1));
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/15: SCB-212 delegated compensation context a dedicated class

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 968247ff160785fe19dd170f871f9a9b6fd82cd3
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 10:19:00 2018 +0800

    SCB-212 delegated compensation context a dedicated class
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/CompensationContext.java    | 61 ++++++++++++++++++++++
 .../saga/omega/context/OmegaContext.java           | 40 --------------
 .../saga/omega/spring/OmegaSpringConfig.java       |  6 +++
 .../spring/CompensableAnnotationProcessor.java     |  9 +++-
 .../spring/CompensableMethodCheckingCallback.java  | 10 ++--
 .../spring/TransactionAspectConfig.java            |  7 +--
 .../transaction/CompensationMessageHandler.java    | 10 ++--
 .../saga/omega/transaction/TransactionAspect.java  |  2 +-
 .../CompensationMessageHandlerTest.java            |  8 +--
 .../TransactionClientHttpRequestInterceptor.java   | 18 +++++--
 10 files changed, 108 insertions(+), 63 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
new file mode 100644
index 0000000..118b033
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.context;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompensationContext {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final Map<String, CompensationContextInternal> contexts = new HashMap<>();
+
+  public void addCompensationContext(Method compensationMethod, Object target) {
+    compensationMethod.setAccessible(true);
+    contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
+  }
+
+  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
+    CompensationContextInternal contextInternal = contexts.get(compensationMethod);
+
+    try {
+      contextInternal.compensationMethod.invoke(contextInternal.target, payloads);
+      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      LOG.error(
+          "Pre-checking for compensate method " + contextInternal.compensationMethod.toString()
+              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+          e);
+    }
+  }
+
+  private static final class CompensationContextInternal {
+    private final Object target;
+    private final Method compensationMethod;
+
+    private CompensationContextInternal(Object target, Method compensationMethod) {
+      this.target = target;
+      this.compensationMethod = compensationMethod;
+    }
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 94de6ef..43bf0b4 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,17 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.context;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class OmegaContext {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
 
@@ -35,7 +25,6 @@ public class OmegaContext {
   private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
   private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
-  private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
 
   public OmegaContext(IdGenerator<String> idGenerator) {
     this.idGenerator = idGenerator;
@@ -83,25 +72,6 @@ public class OmegaContext {
     parentTxId.remove();
   }
 
-  public void addCompensationContext(Method compensationMethod, Object target) {
-    compensationMethod.setAccessible(true);
-    compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
-  }
-
-  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
-    CompensationContext compensationContext = compensationContexts.get(compensationMethod);
-
-    try {
-      compensationContext.compensationMethod.invoke(compensationContext.target, payloads);
-      LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      LOG.error(
-          "Pre-checking for compensate method " + compensationContext.compensationMethod.toString()
-              + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
-          e);
-    }
-  }
-
   @Override
   public String toString() {
     return "OmegaContext{" +
@@ -110,14 +80,4 @@ public class OmegaContext {
         ", parentTxId=" + parentTxId.get() +
         '}';
   }
-
-  private static final class CompensationContext {
-    private final Object target;
-    private final Method compensationMethod;
-
-    private CompensationContext(Object target, Method compensationMethod) {
-      this.target = target;
-      this.compensationMethod = compensationMethod;
-    }
-  }
 }
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index e5806ad..fa4027b 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.spring;
 
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -45,6 +46,11 @@ class OmegaSpringConfig {
   }
 
   @Bean
+  CompensationContext compensationContext() {
+    return new CompensationContext();
+  }
+
+  @Bean
   ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) {
     return new ServiceConfig(serviceName);
   }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 87f9049..338751c 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -25,9 +26,11 @@ import org.springframework.util.ReflectionUtils;
 class CompensableAnnotationProcessor implements BeanPostProcessor {
 
   private final OmegaContext omegaContext;
+  private final CompensationContext compensationContext;
 
-  CompensableAnnotationProcessor(OmegaContext omegaContext) {
+  CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
     this.omegaContext = omegaContext;
+    this.compensationContext = compensationContext;
   }
 
   @Override
@@ -43,7 +46,9 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
   }
 
   private void checkMethod(Object bean) {
-    ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
+    ReflectionUtils.doWithMethods(
+        bean.getClass(),
+        new CompensableMethodCheckingCallback(bean, compensationContext));
   }
 
   private void checkFields(Object bean) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index ac6615c..6c0c333 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.slf4j.Logger;
@@ -31,11 +31,11 @@ class CompensableMethodCheckingCallback implements MethodCallback {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Object bean;
-  private final OmegaContext omegaContext;
+  private final CompensationContext compensationContext;
 
-  CompensableMethodCheckingCallback(Object bean, OmegaContext omegaContext) {
+  CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
     this.bean = bean;
-    this.omegaContext = omegaContext;
+    this.compensationContext = compensationContext;
   }
 
   @Override
@@ -48,7 +48,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
 
     try {
       Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
-      omegaContext.addCompensationContext(signature, bean);
+      compensationContext.addCompensationContext(signature, bean);
       LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
     } catch (NoSuchMethodException e) {
       throw new OmegaException(
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 1d46378..dc88bbd 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
@@ -31,7 +32,7 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 public class TransactionAspectConfig {
 
   @Bean
-  MessageHandler messageHandler(MessageSender sender, OmegaContext context) {
+  MessageHandler messageHandler(MessageSender sender, CompensationContext context) {
     return new CompensationMessageHandler(sender, context);
   }
 
@@ -41,7 +42,7 @@ public class TransactionAspectConfig {
   }
 
   @Bean
-  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext) {
-    return new CompensableAnnotationProcessor(omegaContext);
+  CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+    return new CompensableAnnotationProcessor(omegaContext, compensationContext);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 2e0836e..46c1e9b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -17,20 +17,20 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 
 public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
-  private final OmegaContext omegaContext;
+  private final CompensationContext context;
 
-  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) {
+  public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
     this.sender = sender;
-    this.omegaContext = omegaContext;
+    this.context = context;
   }
 
   @Override
   public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
-    omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads);
+    context.compensate(globalTxId, localTxId, compensationMethod, payloads);
     sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index abafc2f..5a39c0d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -44,7 +44,7 @@ public class TransactionAspect {
     this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
     this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
     this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
-    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(this.context, sender);
+    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index 2ecd82f..2585f3f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.verify;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.junit.Test;
 
 public class CompensationMessageHandlerTest {
@@ -40,8 +40,8 @@ public class CompensationMessageHandlerTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final String payload = uniquify("blah");
 
-  private final OmegaContext omegaContext = mock(OmegaContext.class);
-  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext);
+  private final CompensationContext context = mock(CompensationContext.class);
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
 
   @Test
   public void sendsEventOnCompensationCompleted() throws Exception {
@@ -57,6 +57,6 @@ public class CompensationMessageHandlerTest {
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
 
-    verify(omegaContext).compensate(globalTxId, localTxId, compensationMethod, payload);
+    verify(context).compensate(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
index 5e69598..88ea564 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionClientHttpRequestInterceptor.java
@@ -18,16 +18,22 @@
 
 package org.apache.servicecomb.saga.omega.transport.resttemplate;
 
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.LOCAL_TX_ID_KEY;
+
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpRequest;
 import org.springframework.http.client.ClientHttpRequestExecution;
 import org.springframework.http.client.ClientHttpRequestInterceptor;
 import org.springframework.http.client.ClientHttpResponse;
 
 class TransactionClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
-
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OmegaContext omegaContext;
 
   TransactionClientHttpRequestInterceptor(OmegaContext omegaContext) {
@@ -39,8 +45,14 @@ class TransactionClientHttpRequestInterceptor implements ClientHttpRequestInterc
       ClientHttpRequestExecution execution) throws IOException {
 
     if (omegaContext.globalTxId() != null) {
-      request.getHeaders().add(OmegaContext.GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
-      request.getHeaders().add(OmegaContext.LOCAL_TX_ID_KEY, omegaContext.localTxId());
+      request.getHeaders().add(GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
+      request.getHeaders().add(LOCAL_TX_ID_KEY, omegaContext.localTxId());
+
+      LOG.debug("Added {} {} and {} {} to request header",
+          GLOBAL_TX_ID_KEY,
+          omegaContext.globalTxId(),
+          LOCAL_TX_ID_KEY,
+          omegaContext.localTxId());
     }
     return execution.execute(request, body);
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 13/15: SCB-212 fixed rebase conflict

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 05113e5b3d98229377e54eac8c6db1e851fed500
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 15:18:57 2018 +0800

    SCB-212 fixed rebase conflict
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java          |  5 +++--
 .../transaction/SagaStartAnnotationProcessor.java     |  7 -------
 .../saga/omega/transaction/SagaStartAspect.java       | 17 +++++++++++++----
 .../transaction/SagaStartAnnotationProcessorTest.java | 19 +++++++------------
 .../saga/omega/transaction/SagaStartAspectTest.java   |  5 +++--
 5 files changed, 26 insertions(+), 27 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 079e559..87cdc60 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -20,16 +20,17 @@ package org.apache.servicecomb.saga.alpha.core;
 import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.function.Consumer;
 
 public class TxConsistentService {
@@ -67,7 +68,7 @@ public class TxConsistentService {
     List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
     eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
-      Set<String> eventSet = new HashSet<>(events.size());
+      Set<String> eventSet = new ConcurrentSkipListSet<>();
       events.forEach(e -> eventSet.add(e.localTxId()));
       return eventSet;
     });
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 0aa8fab..2dad5ae 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -31,24 +31,17 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   @Override
   public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    initializeOmegaContext();
     sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
   }
 
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
     sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
-    omegaContext.clear();
   }
 
   @Override
   public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
     String globalTxId = omegaContext.globalTxId();
     sender.send(new TxAbortedEvent(globalTxId, globalTxId, null, compensationMethod, throwable));
-    omegaContext.clear();
-  }
-
-  private void initializeOmegaContext() {
-    omegaContext.setLocalTxId(omegaContext.newGlobalTxId());
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 3fa6322..0951752 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -48,6 +48,7 @@ public class SagaStartAspect {
 
   @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
   Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
+    initializeOmegaContext();
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
@@ -56,16 +57,24 @@ public class SagaStartAspect {
 
     scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
     try {
-      return joinPoint.proceed();
+      Object result = joinPoint.proceed();
+
+      interceptor.postIntercept(context.globalTxId(), method.toString());
+      LOG.debug("Transaction with context {} has finished.", context);
+
+      return result;
     } catch (Throwable throwable) {
-      LOG.error("Failed to process SagaStart method: {}", method.toString());
+      LOG.error("Transaction {} failed.", context.globalTxId());
       throw throwable;
     } finally {
-      LOG.debug("Transaction with context {} has finished.", context);
-      interceptor.postIntercept(context.globalTxId(), method.toString());
+      context.clear();
     }
   }
 
+  private void initializeOmegaContext() {
+    context.setLocalTxId(context.newGlobalTxId());
+  }
+
   private void scheduleTimeoutTask(
       TimeAwareInterceptor interceptor,
       Method method,
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index e857356..9b8adf5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -21,7 +21,6 @@ import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,6 +28,7 @@ import java.util.UUID;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
 import org.junit.Test;
 
 public class SagaStartAnnotationProcessorTest {
@@ -49,15 +49,16 @@ public class SagaStartAnnotationProcessorTest {
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
       sender);
 
+  @Before
+  public void setUp() throws Exception {
+    context.setGlobalTxId(globalTxId);
+    context.setLocalTxId(globalTxId);
+  }
+
   @Test
   public void sendsSagaStartedEvent() {
-    when(generator.nextId()).thenReturn(globalTxId, localTxId);
-
     sagaStartAnnotationProcessor.preIntercept(null, null);
 
-    assertThat(context.globalTxId(), is(globalTxId));
-    assertThat(context.localTxId(), is(globalTxId));
-
     TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
@@ -70,9 +71,6 @@ public class SagaStartAnnotationProcessorTest {
 
   @Test
   public void sendsSagaEndedEvent() {
-    context.setGlobalTxId(globalTxId);
-    context.setLocalTxId(globalTxId);
-
     sagaStartAnnotationProcessor.postIntercept(null, null);
 
     TxEvent event = messages.get(0);
@@ -83,8 +81,5 @@ public class SagaStartAnnotationProcessorTest {
     assertThat(event.compensationMethod().isEmpty(), is(true));
     assertThat(event.type(), is("SagaEndedEvent"));
     assertThat(event.payloads().length, is(0));
-
-    assertThat(context.globalTxId(), is(nullValue()));
-    assertThat(context.localTxId(), is(nullValue()));
   }
 }
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 5322269..b63b8be 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -100,12 +100,13 @@ public class SagaStartAspectTest {
       assertThat(e, is(oops));
     }
 
-    TxEvent event = messages.get(1);
+    assertThat(messages.size(), is(1));
+    TxEvent event = messages.get(0);
 
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(globalTxId));
     assertThat(event.parentTxId(), is(nullValue()));
-    assertThat(event.type(), is("SagaEndedEvent"));
+    assertThat(event.type(), is("SagaStartedEvent"));
 
     assertThat(omegaContext.globalTxId(), is(nullValue()));
     assertThat(omegaContext.localTxId(), is(nullValue()));

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 09/15: SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 50444100ddc4d906b010eefa42d974bb219592ff
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800

    SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java         | 20 +++++++++++++++++---
 .../saga/alpha/core/TxEventRepository.java           |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java     | 18 +++++++++++++++++-
 .../saga/alpha/server/SpringTxEventRepository.java   |  2 +-
 4 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5bab1d8..079e559 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -40,7 +40,6 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
     put(TxAbortedEvent.name(), (event) -> compensate(event));
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
@@ -54,11 +53,18 @@ public class TxConsistentService {
 
   public void handle(TxEvent event) {
     eventRepository.save(event);
-    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+    CompletableFuture.runAsync(() -> {
+      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+        omegaCallback.compensate(event);
+      }
+
+      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+    });
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
     eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
       Set<String> eventSet = new HashSet<>(events.size());
@@ -83,4 +89,12 @@ public class TxConsistentService {
         event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
         null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
   }
+
+  private boolean isGlobalTxAborted(TxEvent event) {
+    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  }
+
+  private boolean isTxEndedEvent(TxEvent event) {
+    return TxEndedEvent.name().equals(event.type());
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
 public interface TxEventRepository {
   void save(TxEvent event);
 
-  List<TxEvent> findStartedTransactions(String globalTxId, String type);
+  List<TxEvent> findTransactions(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index cc5c520..febbfaf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+    public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
@@ -118,6 +118,22 @@ public class TxConsistentServiceTest {
     assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
   }
 
+  @Test
+  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+    ));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+  public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
         .stream()
         .map(TxEventEnvelope::event)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 14/15: SCB-212 attempted to fix test failure

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 8c455ad87396052ba24d4a45c5e0581c0eef485a
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 15:58:16 2018 +0800

    SCB-212 attempted to fix test failure
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/server/AlphaIntegrationTest.java       | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 4997c00..9f13a8f 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -29,10 +29,10 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -94,7 +94,7 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxConsistentService consistentService;
 
-  private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
+  private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
   private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
 
   @AfterClass
@@ -219,11 +219,12 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
-    assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
-    assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
-    assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
-    assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
-    assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
   @Test
@@ -244,7 +245,7 @@ public class AlphaIntegrationTest {
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
     assertThat(receivedCommands.size(), is(1));
-    assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
+    assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
 
     anotherBlockingStub.onDisconnected(anotherServiceConfig);
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 08/15: SCB-212 removed unnecessary lines

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0a18ed864923126e1648c5a0e96c0d8b97ad85d6
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 09:40:58 2018 +0800

    SCB-212 removed unnecessary lines
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index c282ecc..5322269 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -44,7 +44,6 @@ import org.mockito.Mockito;
 public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
@@ -61,7 +60,6 @@ public class SagaStartAspectTest {
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
-    when(joinPoint.getTarget()).thenReturn(this);
 
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
     omegaContext.clear();

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/15: SCB-212 united interceptors into one

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 521238368fc04dc362732bbcc3d20e75ca1a7413
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 16:45:56 2018 +0800

    SCB-212 united interceptors into one
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/annotations/SagaStart.java  |  1 +
 ...nterceptor.java => CompensableInterceptor.java} | 18 ++++++--
 ...Interceptor.java => EventAwareInterceptor.java} | 12 ++---
 .../transaction/PreTransactionInterceptor.java     | 30 ------------
 .../saga/omega/transaction/TransactionAspect.java  | 14 ++----
 .../omega/transaction/annotations/Compensable.java |  2 +
 ...orTest.java => CompensableInterceptorTest.java} | 40 ++++++++++++++--
 .../PostTransactionInterceptorTest.java            | 53 ----------------------
 .../omega/transaction/TransactionAspectTest.java   |  1 -
 9 files changed, 62 insertions(+), 109 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
index 435d72f..7937061 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/SagaStart.java
@@ -26,4 +26,5 @@ import java.lang.annotation.Target;
 @Retention(RUNTIME)
 @Target(METHOD)
 public @interface SagaStart {
+  int timeout() default 0;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
similarity index 58%
rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 862efc9..b443c4d 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -17,14 +17,26 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-class FailedTransactionInterceptor {
+class CompensableInterceptor implements EventAwareInterceptor {
   private final MessageSender sender;
 
-  FailedTransactionInterceptor(MessageSender sender) {
+  CompensableInterceptor(MessageSender sender) {
     this.sender = sender;
   }
 
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+  @Override
+  public void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
+    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
+  }
+
+  @Override
+  public void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+
+  }
+
+  @Override
+  public void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
     sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
similarity index 68%
rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java
rename to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 4197560..9be92e6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -17,14 +17,10 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-class PostTransactionInterceptor {
-  private final MessageSender sender;
+public interface EventAwareInterceptor {
+  void preIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message);
 
-  PostTransactionInterceptor(MessageSender sender) {
-    this.sender = sender;
-  }
+  void postIntercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod);
 
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    sender.send(new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
-  }
+  void onError(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
deleted file mode 100644
index 3280d11..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-class PreTransactionInterceptor {
-  private final MessageSender sender;
-
-  PreTransactionInterceptor(MessageSender sender) {
-    this.sender = sender;
-  }
-
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, message));
-  }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index b5c6859..a447489 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -32,17 +32,13 @@ import org.slf4j.LoggerFactory;
 @Aspect
 public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final PreTransactionInterceptor preTransactionInterceptor;
-  private final PostTransactionInterceptor postTransactionInterceptor;
-  private final FailedTransactionInterceptor failedTransactionInterceptor;
 
   private final OmegaContext context;
+  private final EventAwareInterceptor interceptor;
 
   public TransactionAspect(MessageSender sender, OmegaContext context) {
     this.context = context;
-    this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
-    this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
-    this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
+    this.interceptor = new CompensableInterceptor(sender);
   }
 
   @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -81,7 +77,7 @@ public class TransactionAspect {
   }
 
   private void preIntercept(ProceedingJoinPoint joinPoint, String signature, String parentTxId) {
-    preTransactionInterceptor.intercept(
+    interceptor.preIntercept(
         context.globalTxId(),
         context.newLocalTxId(),
         parentTxId,
@@ -90,7 +86,7 @@ public class TransactionAspect {
   }
 
   private void postIntercept(String signature, String parentTxId) {
-    postTransactionInterceptor.intercept(
+    interceptor.postIntercept(
         context.globalTxId(),
         context.localTxId(),
         parentTxId,
@@ -98,7 +94,7 @@ public class TransactionAspect {
   }
 
   private void interceptException(String signature, Throwable throwable, String parentTxId) {
-    failedTransactionInterceptor.intercept(
+    interceptor.onError(
         context.globalTxId(),
         context.localTxId(),
         parentTxId,
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 0777ce4..c6bbfb6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -26,4 +26,6 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Compensable {
   String compensationMethod();
+
+  int timeout() default 0;
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
similarity index 56%
rename from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
rename to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0a54e26..7505a1f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -29,7 +29,8 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-public class PreTransactionInterceptorTest {
+public class CompensableInterceptorTest {
+
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
@@ -38,11 +39,13 @@ public class PreTransactionInterceptorTest {
   private final MessageSender sender = messages::add;
 
   private final String message = uniquify("message");
-  private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender);
+  private final String compensationMethod = getClass().getCanonicalName();
+
+  private final CompensableInterceptor interceptor = new CompensableInterceptor(sender);
 
   @Test
-  public void sendsTxStartedEvent() throws Exception {
-    interceptor.intercept(globalTxId, localTxId, parentTxId, getClass().getCanonicalName(), message);
+  public void sendsTxStartedEventBefore() throws Exception {
+    interceptor.preIntercept(globalTxId, localTxId, parentTxId, compensationMethod, message);
 
     TxEvent event = messages.get(0);
 
@@ -50,7 +53,34 @@ public class PreTransactionInterceptorTest {
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
     assertThat(event.type(), is("TxStartedEvent"));
-    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+    assertThat(event.compensationMethod(), is(compensationMethod));
     assertThat(asList(event.payloads()), contains(message));
   }
+
+  @Test
+  public void sendsTxEndedEventAfter() throws Exception {
+    interceptor.postIntercept(globalTxId, localTxId, parentTxId, compensationMethod);
+
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxEndedEvent"));
+    assertThat(event.compensationMethod(), is(compensationMethod));
+    assertThat(event.payloads().length, is(0));
+  }
+
+  @Test
+  public void sendsTxAbortedEventOnError() throws Exception {
+    interceptor.onError(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(localTxId));
+    assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxAbortedEvent"));
+    assertThat(event.compensationMethod(), is(compensationMethod));
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
deleted file mode 100644
index 50a9cae..0000000
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.junit.Test;
-
-public class PostTransactionInterceptorTest {
-  private final List<TxEvent> messages = new ArrayList<>();
-  private final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
-
-  private final MessageSender sender = messages::add;
-
-  private final PostTransactionInterceptor interceptor = new PostTransactionInterceptor(sender);
-
-  @Test
-  public void sendsSerializedMessage() throws Exception {
-    interceptor.intercept(globalTxId, localTxId, parentTxId, getClass().getCanonicalName());
-
-
-    TxEvent event = messages.get(0);
-
-    assertThat(event.globalTxId(), is(globalTxId));
-    assertThat(event.localTxId(), is(localTxId));
-    assertThat(event.parentTxId(), is(parentTxId));
-    assertThat(event.type(), is("TxEndedEvent"));
-    assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
-    assertThat(event.payloads().length, is(0));
-  }
-}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 65f23a7..76a0e34 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -39,7 +39,6 @@ public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
 
   private final String newLocalTxId = UUID.randomUUID().toString();
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 11/15: SCB-212 better readability

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 20a5886646be4a819d93a7431d65eca9da9c2ecb
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 14:19:05 2018 +0800

    SCB-212 better readability
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../omega/transaction/SagaStartAnnotationProcessor.java    | 14 +++++---------
 .../transaction/SagaStartAnnotationProcessorTest.java      |  2 +-
 2 files changed, 6 insertions(+), 10 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 6e8556b..0aa8fab 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -31,15 +31,13 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   @Override
   public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    String globalTxId = globalTxId();
-    // reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
-    sender.send(new SagaStartedEvent(globalTxId, globalTxId));
+    initializeOmegaContext();
+    sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
   }
 
   @Override
   public void postIntercept(String parentTxId, String compensationMethod) {
-    String globalTxId = omegaContext.globalTxId();
-    sender.send(new SagaEndedEvent(globalTxId, globalTxId));
+    sender.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     omegaContext.clear();
   }
 
@@ -50,9 +48,7 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
     omegaContext.clear();
   }
 
-  private String globalTxId() {
-    String globalTxId = omegaContext.newGlobalTxId();
-    omegaContext.setLocalTxId(globalTxId);
-    return globalTxId;
+  private void initializeOmegaContext() {
+    omegaContext.setLocalTxId(omegaContext.newGlobalTxId());
   }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 8fa3568..e857356 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -70,8 +70,8 @@ public class SagaStartAnnotationProcessorTest {
 
   @Test
   public void sendsSagaEndedEvent() {
-    context.clear();
     context.setGlobalTxId(globalTxId);
+    context.setLocalTxId(globalTxId);
 
     sagaStartAnnotationProcessor.postIntercept(null, null);
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/15: SCB-212 replaced timeout impl with atomic to avoid locking

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 5751a11afe8fdce73e0a337baca98a5eb9c9d34f
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 22:21:58 2018 +0800

    SCB-212 replaced timeout impl with atomic to avoid locking
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../transaction/OmegaTxTimeoutException.java}      |  10 +-
 .../omega/transaction/TimeAwareInterceptor.java    |  25 ++---
 .../saga/omega/transaction/TransactionAspect.java  |  23 ++++-
 .../transaction/TimeAwareInterceptorTest.java      | 113 ++++++++++++++++++++-
 4 files changed, 150 insertions(+), 21 deletions(-)

diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
similarity index 86%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
index fb2ee1d..eb820d6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java
@@ -17,8 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
-
-public class TimeAwareInterceptorTest {
-
-}
\ No newline at end of file
+public class OmegaTxTimeoutException extends RuntimeException {
+  public OmegaTxTimeoutException(String cause) {
+    super(cause);
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index d633630..317f802 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -17,17 +17,15 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 class TimeAwareInterceptor implements EventAwareInterceptor {
   private final EventAwareInterceptor interceptor;
-  private final BlockingDeque<EventAwareInterceptor> interceptors = new LinkedBlockingDeque<>(2);
+  private final AtomicReference<EventAwareInterceptor> interceptorRef;
 
   TimeAwareInterceptor(EventAwareInterceptor interceptor) {
     this.interceptor = interceptor;
-    this.interceptors.offer(interceptor);
+    this.interceptorRef = new AtomicReference<>(interceptor);
   }
 
   @Override
@@ -37,18 +35,21 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
 
   @Override
   public void postIntercept(String localTxId, String signature) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().postIntercept(localTxId, signature);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.postIntercept(localTxId, signature);
+    }
   }
 
   @Override
   public void onError(String localTxId, String signature, Throwable throwable) {
-    interceptors.offerLast(NO_OP_INTERCEPTOR);
-    interceptors.pollFirst().onError(localTxId, signature, throwable);
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 
-  void onTimeout(String signature, String localTxId) {
-    interceptors.offerFirst(NO_OP_INTERCEPTOR);
-    interceptors.pollLast().onError(localTxId, signature, new TimeoutException());
+  void onTimeout(String localTxId, String signature, Throwable throwable) {
+    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
+      interceptor.onError(localTxId, signature, throwable);
+    }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 9bee829..d3e091c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,9 +59,8 @@ public class TransactionAspect {
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
     interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
-    if (compensable.timeout() > 0) {
-      executor.schedule(() -> interceptor.onTimeout(signature, localTxId), compensable.timeout(), MILLISECONDS);
-    }
+
+    scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
 
     try {
       Object result = joinPoint.proceed();
@@ -77,6 +76,24 @@ public class TransactionAspect {
     }
   }
 
+  private void scheduleTimeoutTask(
+      TimeAwareInterceptor interceptor,
+      String localTxId,
+      String signature,
+      Method method,
+      int timeout) {
+
+    if (timeout > 0) {
+      executor.schedule(
+          () -> interceptor.onTimeout(
+              localTxId,
+              signature,
+              new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")),
+          timeout,
+          MILLISECONDS);
+    }
+  }
+
   private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
       throws NoSuchMethodException {
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index fb2ee1d..d26f04f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -17,8 +17,119 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-import static org.junit.Assert.*;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
 
 public class TimeAwareInterceptorTest {
+  private static final int runningCounts = 1000;
+
+  private final String localTxId = uniquify("localTxId");
+  private final String signature = uniquify("signature");
+
+  private final AtomicInteger postInterceptInvoked = new AtomicInteger();
+  private final AtomicInteger onErrorInvoked = new AtomicInteger();
+  private final AtomicInteger onTimeoutInvoked = new AtomicInteger();
+
+  private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
+    @Override
+    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    }
+
+    @Override
+    public void postIntercept(String parentTxId, String compensationMethod) {
+      postInterceptInvoked.incrementAndGet();
+    }
+
+    @Override
+    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
+      if (throwable instanceof OmegaTxTimeoutException) {
+        onTimeoutInvoked.incrementAndGet();
+      } else {
+        onErrorInvoked.incrementAndGet();
+      }
+    }
+  };
+
+  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
+  private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out");
+
+
+  @Test(timeout = 5000)
+  public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception {
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.postIntercept(localTxId, signature);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  @Test(timeout = 5000)
+  public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception {
+    RuntimeException oops = new RuntimeException("oops");
+    List<Future<?>> futures = new LinkedList<>();
+
+    for (int i = 0; i < runningCounts; i++) {
+      TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying);
+      CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onError(localTxId, signature, oops);
+      }));
+
+      futures.add(executorService.submit(() -> {
+        waitForSignal(cyclicBarrier);
+        interceptor.onTimeout(localTxId, signature, timeoutException);
+      }));
+    }
+
+    waitTillAllDone(futures);
+
+    assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts));
+  }
+
+  private void waitForSignal(CyclicBarrier cyclicBarrier) {
+    try {
+      cyclicBarrier.await();
+    } catch (InterruptedException | BrokenBarrierException e) {
+      fail(e.getMessage());
+    }
+  }
 
+  private void waitTillAllDone(List<Future<?>> futures)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 15/15: SCB-212 replaced completable future with executor to avoid test failures in maven

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 160badefe6896cb9fa3bd52c615ad3ab499ade81
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 21:46:22 2018 +0800

    SCB-212 replaced completable future with executor to avoid test failures in maven
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/alpha/core/TxConsistentService.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 87cdc60..55de6b7 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -28,9 +28,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 public class TxConsistentService {
@@ -46,6 +47,7 @@ public class TxConsistentService {
   }};
 
   private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
     this.eventRepository = eventRepository;
@@ -55,7 +57,7 @@ public class TxConsistentService {
   public void handle(TxEvent event) {
     eventRepository.save(event);
 
-    CompletableFuture.runAsync(() -> {
+    executor.execute(() -> {
       if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
         omegaCallback.compensate(event);
       }
@@ -66,12 +68,12 @@ public class TxConsistentService {
 
   private void compensate(TxEvent event) {
     List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
-    events.forEach(omegaCallback::compensate);
     eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
       Set<String> eventSet = new ConcurrentSkipListSet<>();
       events.forEach(e -> eventSet.add(e.localTxId()));
       return eventSet;
     });
+    events.forEach(omegaCallback::compensate);
   }
 
   private void updateCompensateStatus(TxEvent event) {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.