You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/27 11:30:39 UTC
[servicecomb-pack] 01/05: SCB-1391 Add support for explicit tx
context passing for TCC
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 6d1fe2217644994eca1febf4d1bf5a01e40fb1dd
Author: Daniel Qian <ch...@gmail.com>
AuthorDate: Wed Jul 24 10:27:09 2019 +0800
SCB-1391 Add support for explicit tx context passing for TCC
---
.../pack/omega/transaction/TransactionAspect.java | 32 ++-------
.../omega/transaction/TransactionContextUtils.java | 40 +++++++++++
.../transaction/tcc/TccParticipatorAspect.java | 11 +++
.../omega/transaction/TransactionAspectTest.java | 72 ++++++-------------
.../transaction/TransactionContextUtilsTest.java | 83 ++++++++++++++++++++++
.../transaction/tcc/TccParticipatorAspectTest.java | 30 +++++++-
6 files changed, 191 insertions(+), 77 deletions(-)
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
index 26e2993..34275a6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Method;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
import org.apache.servicecomb.pack.omega.context.TransactionContext;
-import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@@ -31,6 +30,9 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+
@Aspect
public class TransactionAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -49,17 +51,9 @@ public class TransactionAspect {
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// just check if we need to setup the transaction context information first
- TransactionContext transactionContext = getTransactionContextFromArgs(joinPoint.getArgs());
+ TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
if (transactionContext != null) {
- if (context.globalTxId() != null) {
- LOG.warn("The context {}'s globalTxId is not empty. Update it for globalTxId:{} and localTxId:{}", context,
- transactionContext.globalTxId(), transactionContext.localTxId());
- } else {
- LOG.debug("Updated context {} for globalTxId:{} and localTxId:{}", context,
- transactionContext.globalTxId(), transactionContext.localTxId());
- }
- context.setGlobalTxId(transactionContext.globalTxId());
- context.setLocalTxId(transactionContext.localTxId());
+ populateOmegaContext(context, transactionContext, LOG);
}
String localTxId = context.localTxId();
@@ -76,19 +70,5 @@ public class TransactionAspect {
}
}
- TransactionContext getTransactionContextFromArgs(Object[] args) {
- if (args != null) {
- for (Object arg : args) {
- // check the TransactionContext first
- if (arg instanceof TransactionContext) {
- return (TransactionContext) arg;
- }
- if (arg instanceof TransactionContextProperties) {
- TransactionContextProperties transactionContextProperties = (TransactionContextProperties) arg;
- return new TransactionContext(transactionContextProperties.getGlobalTxId(), transactionContextProperties.getLocalTxId());
- }
- }
- }
- return null;
- }
+
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java
new file mode 100644
index 0000000..b7c74f6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtils.java
@@ -0,0 +1,40 @@
+package org.apache.servicecomb.pack.omega.transaction;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
+import org.slf4j.Logger;
+
+public abstract class TransactionContextUtils {
+ private TransactionContextUtils() {
+ // singleton
+ }
+ public static TransactionContext extractTransactionContext(Object[] args) {
+ if (args != null && args.length > 0) {
+ for (Object arg : args) {
+ // check the TransactionContext first
+ if (arg instanceof TransactionContext) {
+ return (TransactionContext) arg;
+ }
+ if (arg instanceof TransactionContextProperties) {
+ TransactionContextProperties transactionContextProperties = (TransactionContextProperties) arg;
+ return new TransactionContext(transactionContextProperties.getGlobalTxId(), transactionContextProperties.getLocalTxId());
+ }
+ }
+ }
+ return null;
+ }
+
+ public static void populateOmegaContext(OmegaContext context, TransactionContext transactionContext, Logger logger) {
+ if (context.globalTxId() != null) {
+ logger.warn("The context {}'s globalTxId is not empty. Update it for globalTxId:{} and localTxId:{}", context,
+ transactionContext.globalTxId(), transactionContext.localTxId());
+ } else {
+ logger.debug("Updated context {} for globalTxId:{} and localTxId:{}", context,
+ transactionContext.globalTxId(), transactionContext.localTxId());
+ }
+ context.setGlobalTxId(transactionContext.globalTxId());
+ context.setLocalTxId(transactionContext.localTxId());
+ }
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
index de05465..c9573c2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspect.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import org.apache.servicecomb.pack.common.TransactionStatus;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
import org.apache.servicecomb.pack.omega.transaction.annotations.Participate;
import org.apache.servicecomb.pack.omega.transaction.tcc.events.ParticipationEndedEvent;
import org.apache.servicecomb.pack.omega.transaction.tcc.events.ParticipationStartedEvent;
@@ -31,6 +32,9 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+
@Aspect
public class TccParticipatorAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,6 +56,11 @@ public class TccParticipatorAspect {
@Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)")
Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
+ if (transactionContext != null) {
+ populateOmegaContext(context, transactionContext, LOG);
+ }
+
String localTxId = context.localTxId();
String cancelMethod = callbackMethodSignature(joinPoint, participate.cancelMethod(), method);
String confirmMethod = callbackMethodSignature(joinPoint, participate.confirmMethod(), method);
@@ -88,4 +97,6 @@ public class TccParticipatorAspect {
.getDeclaredMethod(callbackMethod, tryMethod.getParameterTypes())
.toString();
}
+
+
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index fa23fea..39cc931 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -17,39 +17,35 @@
package org.apache.servicecomb.pack.omega.transaction;
-import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static org.hamcrest.core.AnyOf.anyOf;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
-import java.security.Permission;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.omega.context.IdGenerator;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
-import org.apache.servicecomb.pack.omega.context.TransactionContext;
import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.security.Permission;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TransactionAspectTest {
private final List<TxEvent> messages = new ArrayList<>();
@@ -57,7 +53,7 @@ public class TransactionAspectTest {
private final String localTxId = UUID.randomUUID().toString();
private final String newLocalTxId = UUID.randomUUID().toString();
- private final String transactionGloablTxId = UUID.randomUUID().toString();
+ private final String transactionGlobalTxId = UUID.randomUUID().toString();
private final String transactionLocalTxId = UUID.randomUUID().toString();
private final SagaMessageSender sender = new SagaMessageSender() {
@@ -94,7 +90,6 @@ public class TransactionAspectTest {
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
- private final TransactionContext tx = mock(TransactionContext.class);
private final TransactionContextProperties transactionContextProperties = mock(TransactionContextProperties.class);
@Before
@@ -110,31 +105,8 @@ public class TransactionAspectTest {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
- when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGloablTxId);
+ when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
when(transactionContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
- when(tx.globalTxId()).thenReturn(transactionGloablTxId);
- when(tx.localTxId()).thenReturn(transactionLocalTxId);
- }
-
- @Test
- public void testGetTransactionContextFromArgs() throws Throwable {
-
- TransactionContext result = aspect.getTransactionContextFromArgs(new Object[]{transactionContextProperties});
- assertThat(result.globalTxId(), is(transactionGloablTxId));
- assertThat(result.localTxId(), is(transactionLocalTxId));
-
- result = aspect.getTransactionContextFromArgs(new Object[]{});
- assertNull(result);
-
- result = aspect.getTransactionContextFromArgs(null);
- assertNull(result);
-
- result = aspect.getTransactionContextFromArgs(new Object[]{tx});
- assertThat(result, is(tx));
-
- TransactionContext otherTx = Mockito.mock(TransactionContext.class);
- result = aspect.getTransactionContextFromArgs(new Object[]{otherTx, transactionContextProperties});
- assertThat(result, is(otherTx));
}
@Test
@@ -144,7 +116,7 @@ public class TransactionAspectTest {
aspect.advise(joinPoint, compensable);
TxEvent startedEvent = messages.get(0);
- assertThat(startedEvent.globalTxId(), is(transactionGloablTxId));
+ assertThat(startedEvent.globalTxId(), is(transactionGlobalTxId));
assertThat(startedEvent.localTxId(), is(newLocalTxId));
assertThat(startedEvent.parentTxId(), is(transactionLocalTxId));
assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
@@ -153,12 +125,12 @@ public class TransactionAspectTest {
TxEvent endedEvent = messages.get(1);
- assertThat(endedEvent.globalTxId(), is(transactionGloablTxId));
+ assertThat(endedEvent.globalTxId(), is(transactionGlobalTxId));
assertThat(endedEvent.localTxId(), is(newLocalTxId));
assertThat(endedEvent.parentTxId(), is(transactionLocalTxId));
assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
- assertThat(omegaContext.globalTxId(), is(transactionGloablTxId));
+ assertThat(omegaContext.globalTxId(), is(transactionGlobalTxId));
assertThat(omegaContext.localTxId(), is(transactionLocalTxId));
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java
new file mode 100644
index 0000000..2a27dd1
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionContextUtilsTest.java
@@ -0,0 +1,83 @@
+package org.apache.servicecomb.pack.omega.transaction;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+
+import java.util.UUID;
+
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.extractTransactionContext;
+import static org.apache.servicecomb.pack.omega.transaction.TransactionContextUtils.populateOmegaContext;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TransactionContextUtilsTest {
+
+ private final Logger logger = mock(Logger.class);
+
+ private final String transactionGlobalTxId = UUID.randomUUID().toString();
+ private final String transactionLocalTxId = UUID.randomUUID().toString();
+
+ private final TransactionContext txContext = new TransactionContext(transactionGlobalTxId, transactionLocalTxId);
+ private final TransactionContextProperties txContextProperties = mock(TransactionContextProperties.class);
+
+ @Before
+ public void setUp() {
+ when(txContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
+ when(txContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
+ }
+
+ @Test
+ public void testExtractTransactionContext() {
+
+ TransactionContext result = extractTransactionContext(new Object[] { txContextProperties });
+ assertThat(result.globalTxId(), is(transactionGlobalTxId));
+ assertThat(result.localTxId(), is(transactionLocalTxId));
+
+ result = extractTransactionContext(new Object[] {});
+ assertNull(result);
+
+ result = extractTransactionContext(null);
+ assertNull(result);
+
+ result = extractTransactionContext(new Object[] { txContext });
+ assertThat(result, is(txContext));
+
+ TransactionContext otherTx = Mockito.mock(TransactionContext.class);
+ result = extractTransactionContext(new Object[] { otherTx, txContextProperties });
+ assertThat(result, is(otherTx));
+ }
+
+ @Test
+ public void testPopulateOmegaContextWhenItsEmpty() {
+
+ OmegaContext omegaContext = new OmegaContext(null);
+
+ populateOmegaContext(omegaContext, txContext, logger);
+
+ assertEquals(transactionGlobalTxId, omegaContext.globalTxId());
+ assertEquals(transactionLocalTxId, omegaContext.localTxId());
+
+ }
+
+ @Test
+ public void testPopulateOmegaContextWhenItsNotEmpty() {
+ OmegaContext omegaContext = new OmegaContext(null);
+
+ omegaContext.setGlobalTxId("global-tx-id");
+ omegaContext.setLocalTxId("local-tx-id");
+
+ populateOmegaContext(omegaContext, txContext, logger);
+
+ assertEquals(transactionGlobalTxId, omegaContext.globalTxId());
+ assertEquals(transactionLocalTxId, omegaContext.localTxId());
+
+ }
+
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
index 3c510a0..fc7cb20 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -26,9 +26,10 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.servicecomb.pack.common.TransactionStatus;
+
import org.apache.servicecomb.pack.omega.context.IdGenerator;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.TransactionContextProperties;
import org.apache.servicecomb.pack.omega.transaction.AlphaResponse;
import org.apache.servicecomb.pack.omega.transaction.annotations.Participate;
import org.apache.servicecomb.pack.omega.transaction.tcc.events.*;
@@ -50,6 +51,11 @@ public class TccParticipatorAspectTest {
private final AlphaResponse response = new AlphaResponse(false);
private String confirmMethod;
private String cancelMethod;
+
+ private final String transactionGlobalTxId = UUID.randomUUID().toString();
+ private final String transactionLocalTxId = UUID.randomUUID().toString();
+ private final TransactionContextProperties transactionContextProperties = mock(TransactionContextProperties.class);
+
private final TccMessageSender tccMessageSender = new TccMessageSender() {
@Override
public void onConnected() {
@@ -125,6 +131,28 @@ public class TccParticipatorAspectTest {
confirmMethod = TccParticipatorAspectTest.class.getDeclaredMethod("confirmMethod").toString();
cancelMethod = TccParticipatorAspectTest.class.getDeclaredMethod("cancelMethod").toString();
+
+ when(transactionContextProperties.getGlobalTxId()).thenReturn(transactionGlobalTxId);
+ when(transactionContextProperties.getLocalTxId()).thenReturn(transactionLocalTxId);
+ }
+
+ @Test
+ public void injectTransactionContextExplicitly() throws Throwable {
+
+ when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});
+
+ aspect.advise(joinPoint, participate);
+
+ assertThat(participationStartedEvents.size(), is(1));
+ ParticipationStartedEvent participationStartedEvent = participationStartedEvents.get(0);
+
+ assertThat(participationStartedEvent.getGlobalTxId(), is(transactionGlobalTxId));
+ assertThat(participationStartedEvent.getParentTxId(), is(transactionLocalTxId));
+ assertThat(participationStartedEvent.getLocalTxId(), is(newLocalTxId));
+
+ assertThat(omegaContext.globalTxId(), is(transactionGlobalTxId));
+ assertThat(omegaContext.localTxId(), is(transactionLocalTxId));
+
}
@Test