You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/27 11:30:39 UTC

[servicecomb-pack] 01/05: SCB-1391 Add support for explicit tx context passing for TCC

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 6d1fe2217644994eca1febf4d1bf5a01e40fb1dd
Author: Daniel Qian <ch...@gmail.com>
AuthorDate: Wed Jul 24 10:27:09 2019 +0800

    SCB-1391 Add support for explicit tx context passing for TCC
---
 .../pack/omega/transaction/TransactionAspect.java  | 32 ++-------
 .../omega/transaction/TransactionContextUtils.java | 40 +++++++++++
 .../transaction/tcc/TccParticipatorAspect.java     | 11 +++
 .../omega/transaction/TransactionAspectTest.java   | 72 ++++++-------------
 .../transaction/TransactionContextUtilsTest.java   | 83 ++++++++++++++++++++++
 .../transaction/tcc/TccParticipatorAspectTest.java | 30 +++++++-
 6 files changed, 191 insertions(+), 77 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
index 26e2993..34275a6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Method;
 
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 import org.apache.servicecomb.pack.omega.context.TransactionContext;
-import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
 import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
@@ -31,6 +30,9 @@ import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+
 @Aspect
 public class TransactionAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -49,17 +51,9 @@ public class TransactionAspect {
   Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
     // just check if we need to setup the transaction context information first
-    TransactionContext transactionContext = getTransactionContextFromArgs(joinPoint.getArgs());
+    TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
     if (transactionContext != null) {
-      if (context.globalTxId() != null) {
-        LOG.warn("The context {}'s globalTxId is not empty. Update it for globalTxId:{} and localTxId:{}", context,
-            transactionContext.globalTxId(), transactionContext.localTxId());
-      } else {
-        LOG.debug("Updated context {} for globalTxId:{} and localTxId:{}", context,
-            transactionContext.globalTxId(), transactionContext.localTxId());
-      }
-      context.setGlobalTxId(transactionContext.globalTxId());
-      context.setLocalTxId(transactionContext.localTxId());
+      populateOmegaContext(context, transactionContext, LOG);
     }
 
     String localTxId = context.localTxId();
@@ -76,19 +70,5 @@ public class TransactionAspect {
     }
   }
 
-  TransactionContext getTransactionContextFromArgs(Object[] args) {
-    if (args != null) {
-      for (Object arg : args) {
-        // check the TransactionContext first
-        if (arg instanceof TransactionContext) {
-          return (TransactionContext) arg;
-        }
-        if (arg instanceof TransactionContextProperties) {
-          TransactionContextProperties transactionContextProperties = (TransactionContextProperties) arg;
-          return new TransactionContext(transactionContextProperties.getGlobalTxId(), transactionContextProperties.getLocalTxId());
-        }
-      }
-    }
-    return null;
-  }
+
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java
new file mode 100644
index 0000000..b7c74f6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java
@@ -0,0 +1,40 @@
+package org.apache.servicecomb.pack.omega.transaction;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
+import org.slf4j.Logger;
+
+public abstract class TransactionContextUtils {
+  private TransactionContextUtils() {
+    // singleton
+  }
+  public static TransactionContext extractTransactionContext(Object[] args) {
+    if (args != null && args.length > 0) {
+      for (Object arg : args) {
+        // check the TransactionContext first
+        if (arg instanceof TransactionContext) {
+          return (TransactionContext) arg;
+        }
+        if (arg instanceof TransactionContextProperties) {
+          TransactionContextProperties transactionContextProperties = (TransactionContextProperties) arg;
+          return new TransactionContext(transactionContextProperties.getGlobalTxId(), transactionContextProperties.getLocalTxId());
+        }
+      }
+    }
+    return null;
+  }
+
+  public static void populateOmegaContext(OmegaContext context, TransactionContext transactionContext, Logger logger) {
+    if (context.globalTxId() != null) {
+      logger.warn("The context {}'s globalTxId is not empty. Update it for globalTxId:{} and localTxId:{}", context,
+        transactionContext.globalTxId(), transactionContext.localTxId());
+    } else {
+      logger.debug("Updated context {} for globalTxId:{} and localTxId:{}", context,
+        transactionContext.globalTxId(), transactionContext.localTxId());
+    }
+    context.setGlobalTxId(transactionContext.globalTxId());
+    context.setLocalTxId(transactionContext.localTxId());
+  }
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
index de05465..c9573c2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 import org.apache.servicecomb.pack.common.TransactionStatus;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
 import org.apache.servicecomb.pack.omega.transaction.annotations.Participate;
 import org.apache.servicecomb.pack.omega.transaction.tcc.events.ParticipationEndedEvent;
 import org.apache.servicecomb.pack.omega.transaction.tcc.events.ParticipationStartedEvent;
@@ -31,6 +32,9 @@ import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+
 @Aspect
 public class TccParticipatorAspect {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,6 +56,11 @@ public class TccParticipatorAspect {
   @Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)")
   Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+    TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
+    if (transactionContext != null) {
+      populateOmegaContext(context, transactionContext, LOG);
+    }
+
     String localTxId = context.localTxId();
     String cancelMethod = callbackMethodSignature(joinPoint, participate.cancelMethod(), method);
     String confirmMethod = callbackMethodSignature(joinPoint, participate.confirmMethod(), method);
@@ -88,4 +97,6 @@ public class TccParticipatorAspect {
         .getDeclaredMethod(callbackMethod, tryMethod.getParameterTypes())
         .toString();
   }
+
+
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index fa23fea..39cc931 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -17,39 +17,35 @@
 
 package org.apache.servicecomb.pack.omega.transaction;
 
-import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static org.hamcrest.core.AnyOf.anyOf;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
-import java.security.Permission;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.omega.context.IdGenerator;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
-import org.apache.servicecomb.pack.omega.context.TransactionContext;
 import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
 import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.security.Permission;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TransactionAspectTest {
 
   private final List<TxEvent> messages = new ArrayList<>();
@@ -57,7 +53,7 @@ public class TransactionAspectTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String newLocalTxId = UUID.randomUUID().toString();
 
-  private final String transactionGloablTxId = UUID.randomUUID().toString();
+  private final String transactionGlobalTxId = UUID.randomUUID().toString();
   private final String transactionLocalTxId = UUID.randomUUID().toString();
 
   private final SagaMessageSender sender = new SagaMessageSender() {
@@ -94,7 +90,6 @@ public class TransactionAspectTest {
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
 
-  private final TransactionContext tx = mock(TransactionContext.class);
   private final TransactionContextProperties transactionContextProperties = mock(TransactionContextProperties.class);
 
   @Before
@@ -110,31 +105,8 @@ public class TransactionAspectTest {
     omegaContext.setGlobalTxId(globalTxId);
     omegaContext.setLocalTxId(localTxId);
 
-    when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGloablTxId);
+    when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
     when(transactionContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
-    when(tx.globalTxId()).thenReturn(transactionGloablTxId);
-    when(tx.localTxId()).thenReturn(transactionLocalTxId);
-  }
-
-  @Test
-  public void testGetTransactionContextFromArgs() throws Throwable {
-
-    TransactionContext result = aspect.getTransactionContextFromArgs(new Object[]{transactionContextProperties});
-    assertThat(result.globalTxId(), is(transactionGloablTxId));
-    assertThat(result.localTxId(), is(transactionLocalTxId));
-
-    result = aspect.getTransactionContextFromArgs(new Object[]{});
-    assertNull(result);
-
-    result = aspect.getTransactionContextFromArgs(null);
-    assertNull(result);
-
-    result = aspect.getTransactionContextFromArgs(new Object[]{tx});
-    assertThat(result, is(tx));
-
-    TransactionContext otherTx = Mockito.mock(TransactionContext.class);
-    result = aspect.getTransactionContextFromArgs(new Object[]{otherTx, transactionContextProperties});
-    assertThat(result, is(otherTx));
   }
 
   @Test
@@ -144,7 +116,7 @@ public class TransactionAspectTest {
     aspect.advise(joinPoint, compensable);
     TxEvent startedEvent = messages.get(0);
 
-    assertThat(startedEvent.globalTxId(), is(transactionGloablTxId));
+    assertThat(startedEvent.globalTxId(), is(transactionGlobalTxId));
     assertThat(startedEvent.localTxId(), is(newLocalTxId));
     assertThat(startedEvent.parentTxId(), is(transactionLocalTxId));
     assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
@@ -153,12 +125,12 @@ public class TransactionAspectTest {
 
     TxEvent endedEvent = messages.get(1);
 
-    assertThat(endedEvent.globalTxId(), is(transactionGloablTxId));
+    assertThat(endedEvent.globalTxId(), is(transactionGlobalTxId));
     assertThat(endedEvent.localTxId(), is(newLocalTxId));
     assertThat(endedEvent.parentTxId(), is(transactionLocalTxId));
     assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
 
-    assertThat(omegaContext.globalTxId(), is(transactionGloablTxId));
+    assertThat(omegaContext.globalTxId(), is(transactionGlobalTxId));
     assertThat(omegaContext.localTxId(), is(transactionLocalTxId));
   }
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java
new file mode 100644
index 0000000..2a27dd1
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java
@@ -0,0 +1,83 @@
+package org.apache.servicecomb.pack.omega.transaction;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+
+import java.util.UUID;
+
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TransactionContextUtilsTest {
+
+  private final Logger logger = mock(Logger.class);
+
+  private final String transactionGlobalTxId = UUID.randomUUID().toString();
+  private final String transactionLocalTxId = UUID.randomUUID().toString();
+
+  private final TransactionContext txContext = new TransactionContext(transactionGlobalTxId, transactionLocalTxId);
+  private final TransactionContextProperties txContextProperties = mock(TransactionContextProperties.class);
+
+  @Before
+  public void setUp() {
+    when(txContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
+    when(txContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
+  }
+
+  @Test
+  public void testExtractTransactionContext() {
+
+    TransactionContext result = extractTransactionContext(new Object[] { txContextProperties });
+    assertThat(result.globalTxId(), is(transactionGlobalTxId));
+    assertThat(result.localTxId(), is(transactionLocalTxId));
+
+    result = extractTransactionContext(new Object[] {});
+    assertNull(result);
+
+    result = extractTransactionContext(null);
+    assertNull(result);
+
+    result = extractTransactionContext(new Object[] { txContext });
+    assertThat(result, is(txContext));
+
+    TransactionContext otherTx = Mockito.mock(TransactionContext.class);
+    result = extractTransactionContext(new Object[] { otherTx, txContextProperties });
+    assertThat(result, is(otherTx));
+  }
+
+  @Test
+  public void testPopulateOmegaContextWhenItsEmpty() {
+
+    OmegaContext omegaContext = new OmegaContext(null);
+
+    populateOmegaContext(omegaContext, txContext, logger);
+
+    assertEquals(transactionGlobalTxId, omegaContext.globalTxId());
+    assertEquals(transactionLocalTxId, omegaContext.localTxId());
+
+  }
+
+  @Test
+  public void testPopulateOmegaContextWhenItsNotEmpty() {
+    OmegaContext omegaContext = new OmegaContext(null);
+
+    omegaContext.setGlobalTxId("global-tx-id");
+    omegaContext.setLocalTxId("local-tx-id");
+
+    populateOmegaContext(omegaContext, txContext, logger);
+
+    assertEquals(transactionGlobalTxId, omegaContext.globalTxId());
+    assertEquals(transactionLocalTxId, omegaContext.localTxId());
+
+  }
+
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
index 3c510a0..fc7cb20 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -26,9 +26,10 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import org.apache.servicecomb.pack.common.TransactionStatus;
+
 import org.apache.servicecomb.pack.omega.context.IdGenerator;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
 import org.apache.servicecomb.pack.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.pack.omega.transaction.annotations.Participate;
 import org.apache.servicecomb.pack.omega.transaction.tcc.events.*;
@@ -50,6 +51,11 @@ public class TccParticipatorAspectTest {
   private final AlphaResponse response = new AlphaResponse(false);
   private String confirmMethod;
   private String cancelMethod;
+
+  private final String transactionGlobalTxId = UUID.randomUUID().toString();
+  private final String transactionLocalTxId = UUID.randomUUID().toString();
+  private final TransactionContextProperties transactionContextProperties = mock(TransactionContextProperties.class);
+
   private final TccMessageSender tccMessageSender = new TccMessageSender() {
     @Override
     public void onConnected() {
@@ -125,6 +131,28 @@ public class TccParticipatorAspectTest {
 
     confirmMethod = TccParticipatorAspectTest.class.getDeclaredMethod("confirmMethod").toString();
     cancelMethod = TccParticipatorAspectTest.class.getDeclaredMethod("cancelMethod").toString();
+
+    when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
+    when(transactionContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
+  }
+
+  @Test
+  public void injectTransactionContextExplicitly() throws Throwable {
+
+    when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});
+
+    aspect.advise(joinPoint, participate);
+
+    assertThat(participationStartedEvents.size(), is(1));
+    ParticipationStartedEvent participationStartedEvent = participationStartedEvents.get(0);
+
+    assertThat(participationStartedEvent.getGlobalTxId(), is(transactionGlobalTxId));
+    assertThat(participationStartedEvent.getParentTxId(), is(transactionLocalTxId));
+    assertThat(participationStartedEvent.getLocalTxId(), is(newLocalTxId));
+
+    assertThat(omegaContext.globalTxId(), is(transactionGlobalTxId));
+    assertThat(omegaContext.localTxId(), is(transactionLocalTxId));
+
   }
 
   @Test