You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/25 10:36:30 UTC
[incubator-servicecomb-saga] 06/14: SCB-96 added local and parent
tx id to keep track of entire tx graph
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 70bafcf53fd739467c0be6d7a57319f378c16566
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Dec 23 10:11:27 2017 +0800
SCB-96 added local and parent tx id to keep track of entire tx graph
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/omega/context/OmegaContext.java | 37 +++++++++++++---
.../saga/omega/context/OmegaContextTest.java | 49 +++++++++++++++++++---
.../spring/TransactionInterceptionTest.java | 28 +++++++++----
.../transaction/PreTransactionInterceptor.java | 4 +-
.../saga/omega/transaction/TransactionAspect.java | 9 +++-
.../saga/omega/transaction/TxEvent.java | 22 +++++++---
.../saga/omega/transaction/TxStartedEvent.java | 4 +-
.../transaction/PreTransactionInterceptorTest.java | 16 +++----
8 files changed, 133 insertions(+), 36 deletions(-)
diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index 2c33e41..9884349 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -18,14 +18,41 @@
package io.servicecomb.saga.omega.context;
public class OmegaContext {
- private final ThreadLocal<Long> transactionId = new ThreadLocal<>();
+ private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
+ private final ThreadLocal<String> localTxId = new ThreadLocal<>();
+ private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
- public void setTxId(long txId) {
- transactionId.set(txId);
+ public void setGlobalTxId(String txId) {
+ globalTxId.set(txId);
}
- public long txId() {
- return transactionId.get();
+ public String globalTxId() {
+ return globalTxId.get();
+ }
+
+ public void setLocalTxId(String localTxId) {
+ this.localTxId.set(localTxId);
+ }
+
+ public String localTxId() {
+ return localTxId.get();
+ }
+
+ public String parentTxId() {
+ return parentTxId.get();
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId.set(parentTxId);
+ }
+
+ @Override
+ public String toString() {
+ return "OmegaContext{" +
+ "globalTxId=" + globalTxId.get() +
+ ", localTxId=" + localTxId.get() +
+ ", parentTxId=" + parentTxId.get() +
+ '}';
}
}
diff --git a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
index 82837d1..e890c18 100644
--- a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
+++ b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
@@ -17,10 +17,11 @@
package io.servicecomb.saga.omega.context;
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
@@ -31,15 +32,51 @@ public class OmegaContextTest {
private final OmegaContext omegaContext = new OmegaContext();
@Test
- public void eachThreadGetsDifferentId() throws Exception {
+ public void eachThreadGetsDifferentGlobalTxId() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable runnable = exceptionalRunnable(() -> {
- long txId = nextId();
- omegaContext.setTxId(txId);
+ String txId = UUID.randomUUID().toString();
+ omegaContext.setGlobalTxId(txId);
barrier.await();
- assertThat(omegaContext.txId(), is(txId));
+ assertThat(omegaContext.globalTxId(), is(txId));
+ });
+
+ CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
+ CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
+
+ CompletableFuture.allOf(future1, future2).join();
+ }
+
+ @Test
+ public void eachThreadGetsDifferentLocalTxId() throws Exception {
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Runnable runnable = exceptionalRunnable(() -> {
+ String spanId = UUID.randomUUID().toString();
+ omegaContext.setLocalTxId(spanId);
+ barrier.await();
+
+ assertThat(omegaContext.localTxId(), is(spanId));
+ });
+
+ CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
+ CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
+
+ CompletableFuture.allOf(future1, future2).join();
+ }
+
+ @Test
+ public void eachThreadGetsDifferentParentTxId() throws Exception {
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Runnable runnable = exceptionalRunnable(() -> {
+ String parentId = UUID.randomUUID().toString();
+ omegaContext.setParentTxId(parentId);
+ barrier.await();
+
+ assertThat(omegaContext.parentTxId(), is(parentId));
});
CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index e0afebb..c94a4cb 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -17,13 +17,13 @@
package io.servicecomb.saga.omega.transaction.spring;
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +42,9 @@ import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
public class TransactionInterceptionTest {
- private final long txId = nextId();
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
private final String username = uniquify("username");
private final String email = uniquify("email");
@@ -57,14 +59,16 @@ public class TransactionInterceptionTest {
@Before
public void setUp() throws Exception {
- omegaContext.setTxId(txId);
+ omegaContext.setGlobalTxId(globalTxId);
+ omegaContext.setLocalTxId(localTxId);
+ omegaContext.setParentTxId(parentTxId);
}
@Test
public void sendsUserToRemote_BeforeTransaction() throws Exception {
userService.add(new User(username, email));
- assertThat(messages, contains(serialize(txId, "TxStartedEvent", username, email)));
+ assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, "TxStartedEvent", username, email)));
}
@Configuration
@@ -86,14 +90,24 @@ public class TransactionInterceptionTest {
return event -> {
if (event.payloads()[0] instanceof User) {
User user = ((User) event.payloads()[0]);
- return serialize(event.txId(), event.type(), user.username(), user.email());
+ return serialize(event.globalTxId(),
+ event.localTxId(),
+ event.parentTxId(),
+ event.type(),
+ user.username(),
+ user.email());
}
throw new IllegalArgumentException("Expected instance of User, but was " + event.getClass());
};
}
}
- private static byte[] serialize(long txId, String eventType, String username, String email) {
- return (txId + ":" + eventType + ":" + username + ":" + email).getBytes();
+ private static byte[] serialize(String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String eventType,
+ String username,
+ String email) {
+ return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + eventType + ":" + username + ":" + email).getBytes();
}
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
index 1934280..951d21f 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
@@ -26,7 +26,7 @@ class PreTransactionInterceptor {
this.serializer = serializer;
}
- void intercept(long txId, Object... message) {
- sender.send(serializer.serialize(new TxStartedEvent(txId, message)));
+ void intercept(String globalTxId, String localTxId, String parentTxId, Object... message) {
+ sender.send(serializer.serialize(new TxStartedEvent(globalTxId, localTxId, parentTxId, message)));
}
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index 1701b98..cafc61a 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -43,9 +43,14 @@ public class TransactionAspect {
@Around("execution(@javax.transaction.Transactional * *(..))")
Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- LOG.debug("Intercepting transactional method {}", method.toString());
+ LOG.debug("Intercepting transactional method {} with context {}", method.toString(), context);
+
+ preTransactionInterceptor.intercept(
+ context.globalTxId(),
+ context.localTxId(),
+ context.parentTxId(),
+ joinPoint.getArgs());
- preTransactionInterceptor.intercept(context.txId(), joinPoint.getArgs());
return joinPoint.proceed();
}
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
index 5347c0d..e011aeb 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
@@ -18,16 +18,28 @@
package io.servicecomb.saga.omega.transaction;
public abstract class TxEvent {
- private final long txId;
+ private final String globalTxId;
+ private final String localTxId;
+ private final String parentTxId;
private final Object[] payloads;
- TxEvent(Object[] payloads, long txId) {
+ TxEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) {
+ this.localTxId = localTxId;
+ this.parentTxId = parentTxId;
this.payloads = payloads;
- this.txId = txId;
+ this.globalTxId = globalTxId;
}
- public long txId() {
- return txId;
+ public String globalTxId() {
+ return globalTxId;
+ }
+
+ public String localTxId() {
+ return localTxId;
+ }
+
+ public String parentTxId() {
+ return parentTxId;
}
public Object[] payloads() {
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 2839afa..525da10 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -19,8 +19,8 @@ package io.servicecomb.saga.omega.transaction;
class TxStartedEvent extends TxEvent {
- TxStartedEvent(long txId, Object[] payloads) {
- super(payloads, txId);
+ TxStartedEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) {
+ super(globalTxId, localTxId, parentTxId, payloads);
}
@Override
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
index b20cd3c..7d335ad 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
@@ -17,25 +17,27 @@
package io.servicecomb.saga.omega.transaction;
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.junit.Test;
public class PreTransactionInterceptorTest {
private final List<byte[]> messages = new ArrayList<>();
- private final long txId = nextId();
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
private final MessageSender sender = messages::add;
private final MessageSerializer serializer = event -> {
if (event.payloads()[0] instanceof String) {
String message = (String) event.payloads()[0];
- return serialize(txId, message);
+ return serialize(globalTxId, localTxId, parentTxId, message);
}
throw new IllegalArgumentException("Expected instance of String, but was " + event.getClass());
};
@@ -43,14 +45,14 @@ public class PreTransactionInterceptorTest {
private final String message = uniquify("message");
private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender, serializer);
- private byte[] serialize(long txId, String message) {
- return (txId + ":" + message).getBytes();
+ private byte[] serialize(String globalTxId, String localTxId, String parentTxId, String message) {
+ return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + message).getBytes();
}
@Test
public void sendsSerializedMessage() throws Exception {
- interceptor.intercept(txId, message);
+ interceptor.intercept(globalTxId, localTxId, parentTxId, message);
- assertThat(messages, contains(serialize(txId, message)));
+ assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, message)));
}
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.