You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/25 10:36:30 UTC

[incubator-servicecomb-saga] 06/14: SCB-96 added local and parent tx id to keep track of entire tx graph

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 70bafcf53fd739467c0be6d7a57319f378c16566
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Dec 23 10:11:27 2017 +0800

    SCB-96 added local and parent tx id to keep track of entire tx graph
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java           | 37 +++++++++++++---
 .../saga/omega/context/OmegaContextTest.java       | 49 +++++++++++++++++++---
 .../spring/TransactionInterceptionTest.java        | 28 +++++++++----
 .../transaction/PreTransactionInterceptor.java     |  4 +-
 .../saga/omega/transaction/TransactionAspect.java  |  9 +++-
 .../saga/omega/transaction/TxEvent.java            | 22 +++++++---
 .../saga/omega/transaction/TxStartedEvent.java     |  4 +-
 .../transaction/PreTransactionInterceptorTest.java | 16 +++----
 8 files changed, 133 insertions(+), 36 deletions(-)

diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index 2c33e41..9884349 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -18,14 +18,41 @@
 package io.servicecomb.saga.omega.context;
 
 public class OmegaContext {
-  private final ThreadLocal<Long> transactionId = new ThreadLocal<>();
+  private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
+  private final ThreadLocal<String> localTxId = new ThreadLocal<>();
+  private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
 
 
-  public void setTxId(long txId) {
-    transactionId.set(txId);
+  public void setGlobalTxId(String txId) {
+    globalTxId.set(txId);
   }
 
-  public long txId() {
-    return transactionId.get();
+  public String globalTxId() {
+    return globalTxId.get();
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId.set(localTxId);
+  }
+
+  public String localTxId() {
+    return localTxId.get();
+  }
+
+  public String parentTxId() {
+    return parentTxId.get();
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId.set(parentTxId);
+  }
+
+  @Override
+  public String toString() {
+    return "OmegaContext{" +
+        "globalTxId=" + globalTxId.get() +
+        ", localTxId=" + localTxId.get() +
+        ", parentTxId=" + parentTxId.get() +
+        '}';
   }
 }
diff --git a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
index 82837d1..e890c18 100644
--- a/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
+++ b/omega/omega-context/src/test/java/io/servicecomb/saga/omega/context/OmegaContextTest.java
@@ -17,10 +17,11 @@
 
 package io.servicecomb.saga.omega.context;
 
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CyclicBarrier;
 
@@ -31,15 +32,51 @@ public class OmegaContextTest {
   private final OmegaContext omegaContext = new OmegaContext();
 
   @Test
-  public void eachThreadGetsDifferentId() throws Exception {
+  public void eachThreadGetsDifferentGlobalTxId() throws Exception {
     CyclicBarrier barrier = new CyclicBarrier(2);
 
     Runnable runnable = exceptionalRunnable(() -> {
-      long txId = nextId();
-      omegaContext.setTxId(txId);
+      String txId = UUID.randomUUID().toString();
+      omegaContext.setGlobalTxId(txId);
       barrier.await();
 
-      assertThat(omegaContext.txId(), is(txId));
+      assertThat(omegaContext.globalTxId(), is(txId));
+    });
+
+    CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
+    CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
+
+    CompletableFuture.allOf(future1, future2).join();
+  }
+
+  @Test
+  public void eachThreadGetsDifferentLocalTxId() throws Exception {
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    Runnable runnable = exceptionalRunnable(() -> {
+      String spanId = UUID.randomUUID().toString();
+      omegaContext.setLocalTxId(spanId);
+      barrier.await();
+
+      assertThat(omegaContext.localTxId(), is(spanId));
+    });
+
+    CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
+    CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
+
+    CompletableFuture.allOf(future1, future2).join();
+  }
+
+  @Test
+  public void eachThreadGetsDifferentParentTxId() throws Exception {
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    Runnable runnable = exceptionalRunnable(() -> {
+      String parentId = UUID.randomUUID().toString();
+      omegaContext.setParentTxId(parentId);
+      barrier.await();
+
+      assertThat(omegaContext.parentTxId(), is(parentId));
     });
 
     CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index e0afebb..c94a4cb 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -17,13 +17,13 @@
 
 package io.servicecomb.saga.omega.transaction.spring;
 
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -42,7 +42,9 @@ import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
 public class TransactionInterceptionTest {
-  private final long txId = nextId();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
   private final String username = uniquify("username");
   private final String email = uniquify("email");
 
@@ -57,14 +59,16 @@ public class TransactionInterceptionTest {
 
   @Before
   public void setUp() throws Exception {
-    omegaContext.setTxId(txId);
+    omegaContext.setGlobalTxId(globalTxId);
+    omegaContext.setLocalTxId(localTxId);
+    omegaContext.setParentTxId(parentTxId);
   }
 
   @Test
   public void sendsUserToRemote_BeforeTransaction() throws Exception {
     userService.add(new User(username, email));
 
-    assertThat(messages, contains(serialize(txId, "TxStartedEvent", username, email)));
+    assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, "TxStartedEvent", username, email)));
   }
 
   @Configuration
@@ -86,14 +90,24 @@ public class TransactionInterceptionTest {
       return event -> {
         if (event.payloads()[0] instanceof User) {
           User user = ((User) event.payloads()[0]);
-          return serialize(event.txId(), event.type(), user.username(), user.email());
+          return serialize(event.globalTxId(),
+              event.localTxId(),
+              event.parentTxId(),
+              event.type(),
+              user.username(),
+              user.email());
         }
         throw new IllegalArgumentException("Expected instance of User, but was " + event.getClass());
       };
     }
   }
 
-  private static byte[] serialize(long txId, String eventType, String username, String email) {
-    return (txId + ":" + eventType + ":" + username + ":" + email).getBytes();
+  private static byte[] serialize(String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String eventType,
+      String username,
+      String email) {
+    return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + eventType + ":" + username + ":" + email).getBytes();
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
index 1934280..951d21f 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptor.java
@@ -26,7 +26,7 @@ class PreTransactionInterceptor {
     this.serializer = serializer;
   }
 
-  void intercept(long txId, Object... message) {
-    sender.send(serializer.serialize(new TxStartedEvent(txId, message)));
+  void intercept(String globalTxId, String localTxId, String parentTxId, Object... message) {
+    sender.send(serializer.serialize(new TxStartedEvent(globalTxId, localTxId, parentTxId, message)));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index 1701b98..cafc61a 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -43,9 +43,14 @@ public class TransactionAspect {
   @Around("execution(@javax.transaction.Transactional * *(..))")
   Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-    LOG.debug("Intercepting transactional method {}", method.toString());
+    LOG.debug("Intercepting transactional method {} with context {}", method.toString(), context);
+
+    preTransactionInterceptor.intercept(
+        context.globalTxId(),
+        context.localTxId(),
+        context.parentTxId(),
+        joinPoint.getArgs());
 
-    preTransactionInterceptor.intercept(context.txId(), joinPoint.getArgs());
     return joinPoint.proceed();
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
index 5347c0d..e011aeb 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
@@ -18,16 +18,28 @@
 package io.servicecomb.saga.omega.transaction;
 
 public abstract class TxEvent {
-  private final long txId;
+  private final String globalTxId;
+  private final String localTxId;
+  private final String parentTxId;
   private final Object[] payloads;
 
-  TxEvent(Object[] payloads, long txId) {
+  TxEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) {
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
     this.payloads = payloads;
-    this.txId = txId;
+    this.globalTxId = globalTxId;
   }
 
-  public long txId() {
-    return txId;
+  public String globalTxId() {
+    return globalTxId;
+  }
+
+  public String localTxId() {
+    return localTxId;
+  }
+
+  public String parentTxId() {
+    return parentTxId;
   }
 
   public Object[] payloads() {
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 2839afa..525da10 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -19,8 +19,8 @@ package io.servicecomb.saga.omega.transaction;
 
 class TxStartedEvent extends TxEvent {
 
-  TxStartedEvent(long txId, Object[] payloads) {
-    super(payloads, txId);
+  TxStartedEvent(String globalTxId, String localTxId, String parentTxId, Object[] payloads) {
+    super(globalTxId, localTxId, parentTxId, payloads);
   }
 
   @Override
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
index b20cd3c..7d335ad 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
@@ -17,25 +17,27 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-import static com.seanyinx.github.unit.scaffolding.Randomness.nextId;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.junit.Test;
 
 public class PreTransactionInterceptorTest {
   private final List<byte[]> messages = new ArrayList<>();
-  private final long txId = nextId();
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
   private final MessageSerializer serializer = event -> {
     if (event.payloads()[0] instanceof String) {
       String message = (String) event.payloads()[0];
-      return serialize(txId, message);
+      return serialize(globalTxId, localTxId, parentTxId, message);
     }
     throw new IllegalArgumentException("Expected instance of String, but was " + event.getClass());
   };
@@ -43,14 +45,14 @@ public class PreTransactionInterceptorTest {
   private final String message = uniquify("message");
   private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender, serializer);
 
-  private byte[] serialize(long txId, String message) {
-    return (txId + ":" + message).getBytes();
+  private byte[] serialize(String globalTxId, String localTxId, String parentTxId, String message) {
+    return (globalTxId + ":" + localTxId + ":" + parentTxId + ":" + message).getBytes();
   }
 
   @Test
   public void sendsSerializedMessage() throws Exception {
-    interceptor.intercept(txId, message);
+    interceptor.intercept(globalTxId, localTxId, parentTxId, message);
 
-    assertThat(messages, contains(serialize(txId, message)));
+    assertThat(messages, contains(serialize(globalTxId, localTxId, parentTxId, message)));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.