You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/05 01:15:51 UTC
[incubator-servicecomb-saga] 01/08: SCB-100 async support for omega
context
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e2e63f3cafe25a4ce4da26fe2a8e8dd6d94bc5da
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 3 11:25:45 2018 +0800
SCB-100 async support for omega context
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/context/OmegaContext.java | 6 +-
omega/omega-spring-tx/pom.xml | 4 ++
.../spring/TransactionInterceptionTest.java | 71 ++++++++++++++++------
.../omega/transaction/spring/UserRepository.java | 1 +
4 files changed, 61 insertions(+), 21 deletions(-)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
index 6016b53..f336c4c 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/OmegaContext.java
@@ -31,9 +31,9 @@ public class OmegaContext {
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
- private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
- private final ThreadLocal<String> localTxId = new ThreadLocal<>();
- private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
+ private final ThreadLocal<String> globalTxId = new InheritableThreadLocal<>();
+ private final ThreadLocal<String> localTxId = new InheritableThreadLocal<>();
+ private final ThreadLocal<String> parentTxId = new InheritableThreadLocal<>();
private final IdGenerator<String> idGenerator;
private final Map<String, CompensationContext> compensationContexts = new HashMap<>();
diff --git a/omega/omega-spring-tx/pom.xml b/omega/omega-spring-tx/pom.xml
index e5fc8b3..21661bb 100644
--- a/omega/omega-spring-tx/pom.xml
+++ b/omega/omega-spring-tx/pom.xml
@@ -72,6 +72,10 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index dc23612..9ace53f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -19,7 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
@@ -28,8 +30,20 @@ import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,22 +54,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxCompensatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
-import org.apache.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
-
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
public class TransactionInterceptionTest {
- private static final String TX_STARTED_EVENT = "TxStartedEvent";
- private static final String TX_ENDED_EVENT = "TxEndedEvent";
+ private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private static final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
@@ -77,11 +80,14 @@ public class TransactionInterceptionTest {
@Autowired
private MessageHandler messageHandler;
+ private String compensationMethod;
+
@Before
public void setUp() throws Exception {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
omegaContext.setParentTxId(parentTxId);
+ compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
}
@After
@@ -89,12 +95,15 @@ public class TransactionInterceptionTest {
messages.clear();
}
+ @AfterClass
+ public static void afterClass() throws Exception {
+ executor.shutdown();
+ }
+
@Test
public void sendsUserToRemote_AroundTransaction() throws Exception {
User user = userService.add(new User(username, email));
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -117,8 +126,6 @@ public class TransactionInterceptionTest {
throwable = ignored;
}
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
@@ -135,8 +142,6 @@ public class TransactionInterceptionTest {
String localTxId = omegaContext.newLocalTxId();
User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
- String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
-
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
@@ -156,6 +161,36 @@ public class TransactionInterceptionTest {
);
}
+ @Test
+ public void passesOmegaContextThroughDifferentThreads() throws Exception {
+ User user = new User(username, email);
+ new Thread(() -> userService.add(user)).start();
+
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[]{
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+ }
+
+ @Test
+ public void passesOmegaContextInThreadPool() throws Exception {
+ User user = new User(username, email);
+ executor.execute(() -> userService.add(user));
+
+ await().atMost(500, MILLISECONDS).until(() -> userRepository.findByUsername(username) != null);
+
+ assertArrayEquals(
+ new String[]{
+ new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ toArray(messages)
+ );
+ }
+
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
index bcf3e14..729b7ab 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/UserRepository.java
@@ -20,4 +20,5 @@ package org.apache.servicecomb.saga.omega.transaction.spring;
import org.springframework.data.repository.CrudRepository;
interface UserRepository extends CrudRepository<User, Long> {
+ User findByUsername(String username);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.