You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/05 01:15:50 UTC

[incubator-servicecomb-saga] branch master updated (b862131 -> d38641a)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from b862131  SCB-138 observer maintains its own services instead of using an extra map
     new e2e63f3  SCB-100 async support for omega context
     new 22f2150  SCB-100 added context checking on reactive
     new 13bbe7b  SCB-100 ensured context passing through akka actors
     new 8428f35  SCB-100 cleaned up context after each test
     new 887b6d8  SCB-100 delegated executors to proxy
     new ece5ed6  SCB-100 more test coverage on executor proxy
     new c74d8e7  SCB-100 extended timeout to avoid test failure
     new d38641a  SCB-100 minor refactoring

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../saga/omega/context/OmegaContext.java           |  12 +-
 omega/omega-spring-tx/pom.xml                      |  30 ++++
 .../spring/CompensableAnnotationProcessor.java     |   5 +
 .../transaction/spring/ExecutorFieldCallback.java  | 167 ++++++++++++++++++++
 .../spring/annotations/OmegaContextAware.java}     |  15 +-
 .../spring/CompensableAnnotationCheckingTest.java  |  15 ++
 ...edService.java => MisconfiguredAnnotation.java} |  16 +-
 .../spring/TransactionInterceptionTest.java        | 170 ++++++++++++++++++---
 .../omega/transaction/spring/UserRepository.java   |   1 +
 9 files changed, 389 insertions(+), 42 deletions(-)
 create mode 100644 omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
 copy omega/{omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java => omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java} (72%)
 copy omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/{MisconfiguredService.java => MisconfiguredAnnotation.java} (75%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 02/08: SCB-100 added context checking on reactive

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 22f2150c0ca759fa44e73db57c6ee42faf7b5db9
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:38:21 2018 +0800

    SCB-100 added context checking on reactive
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-spring-tx/pom.xml                      |  6 ++++++
 .../spring/TransactionInterceptionTest.java        | 24 ++++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index 21661bb..815e080 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -76,6 +76,12 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.reactivex.rxjava2</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>2.1.8</version>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 9ace53f..6dee1d8 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -54,6 +54,9 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
+import io.reactivex.Flowable;
+import io.reactivex.schedulers.Schedulers;
+
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
@@ -191,6 +194,27 @@ public class TransactionInterceptionTest {
     );
   }
 
+  @Test
+  public void passesOmegaContextThroughReactiveX() throws Exception {
+    User user = new User(username, email);
+
+    Flowable.just(user)
+        .parallel()
+        .runOn(Schedulers.io())
+        .doOnNext(userService::add)
+        .sequential()
+        .subscribe();
+
+    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+        toArray(messages)
+    );
+  }
+
   private String[] toArray(List<String> messages) {
     return messages.toArray(new String[messages.size()]);
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/08: SCB-100 delegated executors to proxy

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 887b6d8f6901deeaba6577a166f30a83d949f4a2
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 17:47:05 2018 +0800

    SCB-100 delegated executors to proxy
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../spring/CompensableAnnotationProcessor.java     |   5 +
 .../transaction/spring/ExecutorFieldCallback.java  | 163 +++++++++++++++++++++
 .../spring/annotations/OmegaContextAware.java      |  30 ++++
 .../spring/CompensableAnnotationCheckingTest.java  |  15 ++
 .../spring/MisconfiguredAnnotation.java            |  29 ++++
 .../spring/TransactionInterceptionTest.java        |  57 ++++---
 6 files changed, 278 insertions(+), 21 deletions(-)

diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 9cb5b27..87f9049 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -33,6 +33,7 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
   @Override
   public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
     checkMethod(bean);
+    checkFields(bean);
     return bean;
   }
 
@@ -44,4 +45,8 @@ class CompensableAnnotationProcessor implements BeanPostProcessor {
   private void checkMethod(Object bean) {
     ReflectionUtils.doWithMethods(bean.getClass(), new CompensableMethodCheckingCallback(bean, omegaContext));
   }
+
+  private void checkFields(Object bean) {
+    ReflectionUtils.doWithFields(bean.getClass(), new ExecutorFieldCallback(bean, omegaContext));
+  }
 }
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
new file mode 100644
index 0000000..3030fca
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.ReflectionUtils.FieldCallback;
+
+class ExecutorFieldCallback implements FieldCallback {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OmegaContext omegaContext;
+  private final Object bean;
+
+  ExecutorFieldCallback(Object bean, OmegaContext omegaContext) {
+    this.omegaContext = omegaContext;
+    this.bean = bean;
+  }
+
+  @Override
+  public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
+    if (!field.isAnnotationPresent(OmegaContextAware.class)) {
+      return;
+    }
+
+    ReflectionUtils.makeAccessible(field);
+
+    Class<?> generic = field.getType();
+
+    if (!Executor.class.isAssignableFrom(generic)) {
+      throw new IllegalArgumentException(
+          "Only Executor, ExecutorService, and ScheduledExecutorService are supported for @"
+              + OmegaContextAware.class.getSimpleName());
+    }
+
+    field.set(bean, ExecutorProxy.newInstance(field.get(bean), field.getType(), omegaContext));
+  }
+
+  private static class RunnableProxy implements InvocationHandler {
+
+    private final String globalTxId;
+    private final String localTxId;
+    private final String parentTxId;
+    private final Object runnable;
+    private final OmegaContext omegaContext;
+
+    private static Object newInstance(Object runnable, OmegaContext omegaContext) {
+      RunnableProxy runnableProxy = new RunnableProxy(omegaContext, runnable);
+      return Proxy.newProxyInstance(
+          runnable.getClass().getClassLoader(),
+          runnable.getClass().getInterfaces(),
+          runnableProxy);
+    }
+
+    private RunnableProxy(OmegaContext omegaContext, Object runnable) {
+      this.omegaContext = omegaContext;
+      this.globalTxId = omegaContext.globalTxId();
+      this.localTxId = omegaContext.localTxId();
+      this.parentTxId = omegaContext.parentTxId();
+      this.runnable = runnable;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      try {
+        LOG.debug("Setting OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+            globalTxId,
+            localTxId,
+            parentTxId);
+
+        omegaContext.setGlobalTxId(globalTxId);
+        omegaContext.setLocalTxId(localTxId);
+        omegaContext.setParentTxId(parentTxId);
+
+        return method.invoke(runnable, args);
+      } finally {
+        omegaContext.clear();
+        LOG.debug("Cleared OmegaContext with globalTxId [{}], localTxId [{}], & parentTxId [{}]",
+            globalTxId,
+            localTxId,
+            parentTxId);
+      }
+    }
+  }
+
+  private static class ExecutorProxy implements InvocationHandler {
+    private final Object target;
+    private final OmegaContext omegaContext;
+
+    private ExecutorProxy(Object target, OmegaContext omegaContext) {
+      this.target = target;
+      this.omegaContext = omegaContext;
+    }
+
+    private static Object newInstance(Object target, Class<?> targetClass, OmegaContext omegaContext) {
+      Class<?>[] interfaces = targetClass.isInterface() ? new Class<?>[] {targetClass} : targetClass.getInterfaces();
+
+      return Proxy.newProxyInstance(
+          targetClass.getClassLoader(),
+          interfaces,
+          new ExecutorProxy(target, omegaContext));
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      Object[] augmentedArgs = new Object[args.length];
+
+      for (int i = 0; i < args.length; i++) {
+        Object arg = args[i];
+        if (isExecutable(arg)) {
+          augmentedArgs[i] = RunnableProxy.newInstance(arg, omegaContext);
+        } else if (isCollectionOfExecutables(arg)) {
+          augmentedArgs[i] = ((Collection<?>) arg)
+              .stream()
+              .map(a -> RunnableProxy.newInstance(a, omegaContext))
+              .collect(Collectors.toList());
+        } else {
+          augmentedArgs[i] = arg;
+        }
+      }
+
+      return method.invoke(target, augmentedArgs);
+    }
+
+    private boolean isExecutable(Object arg) {
+      return arg instanceof Runnable || arg instanceof Callable;
+    }
+
+    private boolean isCollectionOfExecutables(Object arg) {
+      return arg instanceof Collection
+          && !((Collection<?>) arg).isEmpty()
+          && isExecutable(((Collection<?>) arg).iterator().next());
+    }
+  }
+}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
new file mode 100644
index 0000000..5a4e7e4
--- /dev/null
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/annotations/OmegaContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring.annotations;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target({FIELD, METHOD})
+public @interface OmegaContextAware {
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
index 3b399b9..d2abfc9 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationCheckingTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import org.junit.Test;
@@ -39,4 +40,18 @@ public class CompensableAnnotationCheckingTest {
       assertThat(e.getCause().getMessage(), startsWith("No such compensation method [none]"));
     }
   }
+
+  @Test
+  public void blowsUpWhenAnnotationOnWrongType() throws Exception {
+    try {
+      try (ConfigurableApplicationContext ignored = new SpringApplicationBuilder(TransactionTestMain.class)
+          .profiles("omega-context-aware-checking")
+          .run()) {
+        expectFailing(BeanCreationException.class);
+      }
+    } catch (BeanCreationException e) {
+      assertThat(e.getCause().getMessage(),
+          is("Only Executor, ExecutorService, and ScheduledExecutorService are supported for @OmegaContextAware"));
+    }
+  }
 }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
new file mode 100644
index 0000000..99459a5
--- /dev/null
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/MisconfiguredAnnotation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.spring;
+
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+@Configuration
+@Profile("omega-context-aware-checking")
+class MisconfiguredAnnotation {
+  @OmegaContextAware
+  private final User user = new User();
+}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6de3d18..a2b7641 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 import static akka.actor.ActorRef.noSender;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -44,6 +44,7 @@ import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
 import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
+import org.apache.servicecomb.saga.omega.transaction.spring.annotations.OmegaContextAware;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -67,13 +68,21 @@ import akka.actor.Props;
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
-  private static final ExecutorService executor = Executors.newSingleThreadExecutor();
   private static final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
+  private final User user = new User(username, email);
+  private final User illegalUser = new User(ILLEGAL_USER, email);
+
+  private final String usernameJack = uniquify("Jack");
+  private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
+
+  @OmegaContextAware
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Autowired
   private List<String> messages;
 
@@ -108,12 +117,11 @@ public class TransactionInterceptionTest {
 
   @AfterClass
   public static void afterClass() throws Exception {
-    executor.shutdown();
   }
 
   @Test
   public void sendsUserToRemote_AroundTransaction() throws Exception {
-    User user = userService.add(new User(username, email));
+    User user = userService.add(this.user);
 
     assertArrayEquals(
         new String[]{
@@ -129,9 +137,8 @@ public class TransactionInterceptionTest {
   @Test
   public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
     Throwable throwable = null;
-    User user = new User(ILLEGAL_USER, email);
     try {
-      userService.add(user);
+      userService.add(illegalUser);
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException ignored) {
       throwable = ignored;
@@ -139,7 +146,7 @@ public class TransactionInterceptionTest {
 
     assertArrayEquals(
         new String[]{
-            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
             new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
         toArray(messages)
     );
@@ -147,11 +154,11 @@ public class TransactionInterceptionTest {
 
   @Test
   public void compensateOnTransactionException() throws Exception {
-    User user = userService.add(new User(username, email));
+    User user = userService.add(this.user);
 
     // another sub transaction to the same service within the same global transaction
     String localTxId = omegaContext.newLocalTxId();
-    User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
+    User anotherUser = userService.add(jack);
 
     messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
     messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
@@ -174,38 +181,44 @@ public class TransactionInterceptionTest {
 
   @Test
   public void passesOmegaContextThroughDifferentThreads() throws Exception {
-    User user = new User(username, email);
     new Thread(() -> userService.add(user)).start();
+    waitTillSavedUser(username);
 
-    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+    String newLocalTxId = omegaContext.newLocalTxId();
+    new Thread(() -> userService.add(jack)).start();
+    waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
 
   @Test
   public void passesOmegaContextInThreadPool() throws Exception {
-    User user = new User(username, email);
     executor.execute(() -> userService.add(user));
+    waitTillSavedUser(username);
 
-    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+    String newLocalTxId = omegaContext.newLocalTxId();
+    executor.invokeAll(singletonList(() -> userService.add(jack)));
+    waitTillSavedUser(usernameJack);
 
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
-            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
+            new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
         toArray(messages)
     );
   }
 
   @Test
   public void passesOmegaContextThroughReactiveX() throws Exception {
-    User user = new User(username, email);
-
     Flowable.just(user)
         .parallel()
         .runOn(Schedulers.io())
@@ -213,7 +226,7 @@ public class TransactionInterceptionTest {
         .sequential()
         .subscribe();
 
-    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+    waitTillSavedUser(username);
 
     assertArrayEquals(
         new String[]{
@@ -228,12 +241,10 @@ public class TransactionInterceptionTest {
     // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
     ActorSystem actorSystem = ActorSystem.create();
 
-    User user = new User(username, email);
-
     ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
     actorRef.tell(user, noSender());
 
-    await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null);
+    waitTillSavedUser(username);
 
     assertArrayEquals(
         new String[] {
@@ -245,6 +256,10 @@ public class TransactionInterceptionTest {
     actorSystem.terminate();
   }
 
+  private void waitTillSavedUser(String username) {
+    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+  }
+
   private static class UserServiceActor extends AbstractLoggingActor {
     private final TransactionalUserService userService;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 08/08: SCB-100 minor refactoring

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit d38641aa7794c0491d953c1e6a6396c02ac2fd29
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 11:51:27 2018 +0800

    SCB-100 minor refactoring
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/transaction/spring/ExecutorFieldCallback.java        | 6 +++++-
 .../saga/omega/transaction/spring/TransactionInterceptionTest.java  | 3 ++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
index 3030fca..5ae949d 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/ExecutorFieldCallback.java
@@ -131,6 +131,10 @@ class ExecutorFieldCallback implements FieldCallback {
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      return method.invoke(target, augmentRunnablesWithOmegaContext(args));
+    }
+
+    private Object[] augmentRunnablesWithOmegaContext(Object[] args) {
       Object[] augmentedArgs = new Object[args.length];
 
       for (int i = 0; i < args.length; i++) {
@@ -147,7 +151,7 @@ class ExecutorFieldCallback implements FieldCallback {
         }
       }
 
-      return method.invoke(target, augmentedArgs);
+      return augmentedArgs;
     }
 
     private boolean isExecutable(Object arg) {
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 26b2de3..3dc8c73 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -217,6 +217,7 @@ public class TransactionInterceptionTest {
     );
   }
 
+  // TODO: 2018/1/4 reactive is not supported yet, omega context won't be updated on shared threads
   @Test
   public void passesOmegaContextThroughReactiveX() throws Exception {
     Flowable.just(user)
@@ -236,9 +237,9 @@ public class TransactionInterceptionTest {
     );
   }
 
+  // TODO: 2018/1/4 actor system is not supported yet
   @Test
   public void passesOmegaContextAmongActors() throws Exception {
-    // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
     ActorSystem actorSystem = ActorSystem.create();
 
     ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/08: SCB-100 cleaned up context after each test

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 8428f35c71501cc165428a55eeb153637c65238a
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 14:22:20 2018 +0800

    SCB-100 cleaned up context after each test
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../org/apache/servicecomb/saga/omega/context/OmegaContext.java     | 6 ++++++
 .../saga/omega/transaction/spring/TransactionInterceptionTest.java  | 1 +
 2 files changed, 7 insertions(+)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index f336c4c..94de6ef 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -77,6 +77,12 @@ public class OmegaContext {
     this.parentTxId.set(parentTxId);
   }
 
+  public void clear() {
+    globalTxId.remove();
+    localTxId.remove();
+    parentTxId.remove();
+  }
+
   public void addCompensationContext(Method compensationMethod, Object target) {
     compensationMethod.setAccessible(true);
     compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 253e636..6de3d18 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -103,6 +103,7 @@ public class TransactionInterceptionTest {
   public void tearDown() throws Exception {
     messages.clear();
     userRepository.deleteAll();
+    omegaContext.clear();
   }
 
   @AfterClass

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/08: SCB-100 more test coverage on executor proxy

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ece5ed6d1677731b5ec4bb0d7026d2502df63c98
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 09:26:51 2018 +0800

    SCB-100 more test coverage on executor proxy
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/transaction/spring/TransactionInterceptionTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index a2b7641..9597c6c 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -32,8 +32,8 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
@@ -81,7 +81,7 @@ public class TransactionInterceptionTest {
   private final User jack = new User(usernameJack, uniquify("jack@gmail.com"));
 
   @OmegaContextAware
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
   @Autowired
   private List<String> messages;
@@ -200,7 +200,7 @@ public class TransactionInterceptionTest {
 
   @Test
   public void passesOmegaContextInThreadPool() throws Exception {
-    executor.execute(() -> userService.add(user));
+    executor.schedule(() -> userService.add(user), 0, MILLISECONDS);
     waitTillSavedUser(username);
 
     String newLocalTxId = omegaContext.newLocalTxId();

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/08: SCB-100 ensured context passing through akka actors

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 13bbe7bd6b991aef6de701d988d65e06760710ca
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:54:58 2018 +0800

    SCB-100 ensured context passing through akka actors
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-spring-tx/pom.xml                      | 20 +++++++++
 .../spring/TransactionInterceptionTest.java        | 48 ++++++++++++++++++++++
 2 files changed, 68 insertions(+)

diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index 815e080..022f0ba 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -82,6 +82,26 @@
       <version>2.1.8</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-testkit_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 6dee1d8..253e636 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.servicecomb.saga.omega.transaction.spring;
 
+import static akka.actor.ActorRef.noSender;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -56,6 +58,10 @@ import org.springframework.test.context.junit4.SpringRunner;
 
 import io.reactivex.Flowable;
 import io.reactivex.schedulers.Schedulers;
+import akka.actor.AbstractLoggingActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@@ -96,6 +102,7 @@ public class TransactionInterceptionTest {
   @After
   public void tearDown() throws Exception {
     messages.clear();
+    userRepository.deleteAll();
   }
 
   @AfterClass
@@ -215,6 +222,47 @@ public class TransactionInterceptionTest {
     );
   }
 
+  @Test
+  public void passesOmegaContextAmongActors() throws Exception {
+    // TODO: 2018/1/3 if actor system starts before omega context initialized, this test will fail
+    ActorSystem actorSystem = ActorSystem.create();
+
+    User user = new User(username, email);
+
+    ActorRef actorRef = actorSystem.actorOf(UserServiceActor.props(userService));
+    actorRef.tell(user, noSender());
+
+    await().atMost(1, SECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+    assertArrayEquals(
+        new String[] {
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+        toArray(messages)
+    );
+
+    actorSystem.terminate();
+  }
+
+  private static class UserServiceActor extends AbstractLoggingActor {
+    private final TransactionalUserService userService;
+
+    private UserServiceActor(TransactionalUserService userService) {
+      this.userService = userService;
+    }
+
+    static Props props(TransactionalUserService userService) {
+      return Props.create(UserServiceActor.class, () -> new UserServiceActor(userService));
+    }
+
+    @Override
+    public Receive createReceive() {
+      return receiveBuilder()
+          .match(User.class, userService::add)
+          .build();
+    }
+  }
+
   private String[] toArray(List<String> messages) {
     return messages.toArray(new String[messages.size()]);
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 07/08: SCB-100 extended timeout to avoid test failure

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit c74d8e7133176deaaf930a782adb63783537a3b6
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 4 09:48:02 2018 +0800

    SCB-100 extended timeout to avoid test failure
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/transaction/spring/TransactionInterceptionTest.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 9597c6c..26b2de3 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -257,7 +257,7 @@ public class TransactionInterceptionTest {
   }
 
   private void waitTillSavedUser(String username) {
-    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+    await().atMost(1000, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
   }
 
   private static class UserServiceActor extends AbstractLoggingActor {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/08: SCB-100 async support for omega context

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e2e63f3cafe25a4ce4da26fe2a8e8dd6d94bc5da
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:25:45 2018 +0800

    SCB-100 async support for omega context
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java           |  6 +-
 omega/omega-spring-tx/pom.xml                      |  4 ++
 .../spring/TransactionInterceptionTest.java        | 71 ++++++++++++++++------
 .../omega/transaction/spring/UserRepository.java   |  1 +
 4 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 6016b53..f336c4c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -31,9 +31,9 @@ public class OmegaContext {
   public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
   public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
 
-  private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
-  private final ThreadLocal<String> localTxId = new ThreadLocal<>();
-  private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
+  private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
+  private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
+  private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
   private final IdGenerator<String> idGenerator;
   private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
 
diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index e5fc8b3..21661bb 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -72,6 +72,10 @@
       <artifactId>spring-boot-starter-data-jpa</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index dc23612..9ace53f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -19,7 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
@@ -28,8 +30,20 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,22 +54,11 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
-import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 @AutoConfigureMockMvc
 public class TransactionInterceptionTest {
-  private static final String TX_STARTED_EVENT = "TxStartedEvent";
-  private static final String TX_ENDED_EVENT = "TxEndedEvent";
+  private static final ExecutorService executor = Executors.newSingleThreadExecutor();
   private static final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
@@ -77,11 +80,14 @@ public class TransactionInterceptionTest {
   @Autowired
   private MessageHandler messageHandler;
 
+  private String compensationMethod;
+
   @Before
   public void setUp() throws Exception {
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
     omegaContext.setParentTxId(parentTxId);
+    compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
   }
 
   @After
@@ -89,12 +95,15 @@ public class TransactionInterceptionTest {
     messages.clear();
   }
 
+  @AfterClass
+  public static void afterClass() throws Exception {
+    executor.shutdown();
+  }
+
   @Test
   public void sendsUserToRemote_AroundTransaction() throws Exception {
     User user = userService.add(new User(username, email));
 
-    String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -117,8 +126,6 @@ public class TransactionInterceptionTest {
       throwable = ignored;
     }
 
-    String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
     assertArrayEquals(
         new String[]{
             new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -135,8 +142,6 @@ public class TransactionInterceptionTest {
     String localTxId = omegaContext.newLocalTxId();
     User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
 
-    String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
     messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
     messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
 
@@ -156,6 +161,36 @@ public class TransactionInterceptionTest {
     );
   }
 
+  @Test
+  public void passesOmegaContextThroughDifferentThreads() throws Exception {
+    User user = new User(username, email);
+    new Thread(() -> userService.add(user)).start();
+
+    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+        toArray(messages)
+    );
+  }
+
+  @Test
+  public void passesOmegaContextInThreadPool() throws Exception {
+    User user = new User(username, email);
+    executor.execute(() -> userService.add(user));
+
+    await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+        toArray(messages)
+    );
+  }
+
   private String[] toArray(List<String> messages) {
     return messages.toArray(new String[messages.size()]);
   }
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
index bcf3e14..729b7ab 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
@@ -20,4 +20,5 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
 import org.springframework.data.repository.CrudRepository;
 
 interface UserRepository extends CrudRepository<User, Long> {
+  User findByUsername(String username);
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.