You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 07:20:15 UTC
[incubator-servicecomb-saga] 02/13: SCB-212 made each @Compensable
as a new sub-transaction
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit ef9b3f2612713d866d859926a8278e7dc43c51e3
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 15:39:20 2018 +0800
SCB-212 made each @Compensable as a new sub-transaction
Signed-off-by: seanyinx <se...@huawei.com>
---
.../integration/pack/tests/GreetingController.java | 13 ++
.../saga/integration/pack/tests/PackIT.java | 184 +++++++++++++--------
.../spring/TransactionAspectConfig.java | 9 +
.../spring/TransactionInterceptionTest.java | 74 +++++----
.../transaction/SagaStartAnnotationProcessor.java | 12 +-
.../saga/omega/transaction/SagaStartAspect.java | 60 +++++++
.../saga/omega/transaction/TransactionAspect.java | 46 ++----
.../SagaStartAnnotationProcessorTest.java | 11 +-
.../omega/transaction/SagaStartAspectTest.java | 124 ++++++++++++++
.../omega/transaction/TransactionAspectTest.java | 124 ++++++++++++++
.../TransactionHandlerInterceptor.java | 16 +-
.../TransactionHandlerInterceptorTest.java | 8 +-
12 files changed, 524 insertions(+), 157 deletions(-)
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 3c7c095..2bdd587 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.saga.integration.pack.tests;
import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
@@ -64,4 +65,16 @@ public class GreetingController {
ResponseEntity<String> rude(@RequestParam String name) {
return ResponseEntity.ok(greetingService.beingRude(name));
}
+
+ @SagaStart
+ @Compensable(compensationMethod = "goodNight")
+ @GetMapping("/goodMorning")
+ ResponseEntity<String> goodMorning(@RequestParam String name) {
+ String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
+ return ResponseEntity.ok("Good morning, " + bonjour);
+ }
+
+ ResponseEntity<String> goodNight(@RequestParam String name) {
+ return ResponseEntity.ok("Good night, " + name);
+ }
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 56092be..7ed1b88 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -48,7 +48,6 @@ import org.springframework.test.context.junit4.SpringRunner;
properties = {"server.port=8080", "spring.application.name=greeting-service"})
public class PackIT {
private static final String serviceName = "greeting-service";
- private final String globalTxId = UUID.randomUUID().toString();
@Autowired
private TestRestTemplate restTemplate;
@@ -84,47 +83,47 @@ public class PackIT {
assertThat(envelopes.size(), is(6));
- TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
- assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
- assertThat(sagaStartedEventEnvelope.localTxId(), is(notNullValue()));
- assertThat(sagaStartedEventEnvelope.parentTxId(), is(nullValue()));
- assertThat(sagaStartedEventEnvelope.serviceName(), is(serviceName));
- assertThat(sagaStartedEventEnvelope.instanceId(), is(notNullValue()));
-
- TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
- assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
- assertThat(txStartedEventEnvelope1.localTxId(), is(notNullValue()));
- assertThat(txStartedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
- assertThat(txStartedEventEnvelope1.serviceName(), is(serviceName));
- assertThat(txStartedEventEnvelope1.instanceId(), is(sagaStartedEventEnvelope.instanceId()));
-
- TxEventEnvelope txEndedEventEnvelope1 = envelopes.get(2);
- assertThat(txEndedEventEnvelope1.type(), is("TxEndedEvent"));
- assertThat(txEndedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txEndedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
- assertThat(txEndedEventEnvelope1.serviceName(), is(serviceName));
- assertThat(txEndedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
- TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
- assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
- assertThat(txStartedEventEnvelope2.localTxId(), is(notNullValue()));
- assertThat(txStartedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txStartedEventEnvelope2.serviceName(), is(serviceName));
- assertThat(txStartedEventEnvelope2.instanceId(), is(notNullValue()));
-
- TxEventEnvelope txEndedEventEnvelope2 = envelopes.get(4);
- assertThat(txEndedEventEnvelope2.type(), is("TxEndedEvent"));
- assertThat(txEndedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
- assertThat(txEndedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txEndedEventEnvelope2.serviceName(), is(serviceName));
- assertThat(txEndedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
- TxEventEnvelope sagaEndedEventEnvelope = envelopes.get(5);
- assertThat(sagaEndedEventEnvelope.type(), is("SagaEndedEvent"));
- assertThat(sagaEndedEventEnvelope.localTxId(), is(sagaStartedEventEnvelope.localTxId()));
- assertThat(sagaEndedEventEnvelope.parentTxId(), is(nullValue()));
- assertThat(sagaEndedEventEnvelope.serviceName(), is(serviceName));
- assertThat(sagaEndedEventEnvelope.instanceId(), is(notNullValue()));
+ TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+ assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+ assertThat(sagaStartedEvent.localTxId(), is(globalTxId));
+ assertThat(sagaStartedEvent.parentTxId(), is(nullValue()));
+ assertThat(sagaStartedEvent.serviceName(), is(serviceName));
+ assertThat(sagaStartedEvent.instanceId(), is(notNullValue()));
+
+ TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+ assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+ assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+ assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+ assertThat(txStartedEvent1.serviceName(), is(serviceName));
+ assertThat(txStartedEvent1.instanceId(), is(sagaStartedEvent.instanceId()));
+
+ TxEventEnvelope txEndedEvent1 = envelopes.get(2);
+ assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+ assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+ assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+ assertThat(txEndedEvent1.serviceName(), is(serviceName));
+ assertThat(txEndedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+ TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+ assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+ assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+ assertThat(txStartedEvent2.parentTxId(), is(globalTxId));
+ assertThat(txStartedEvent2.serviceName(), is(serviceName));
+ assertThat(txStartedEvent2.instanceId(), is(notNullValue()));
+
+ TxEventEnvelope txEndedEvent2 = envelopes.get(4);
+ assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+ assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+ assertThat(txEndedEvent2.parentTxId(), is(globalTxId));
+ assertThat(txEndedEvent2.serviceName(), is(serviceName));
+ assertThat(txEndedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
+
+ TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+ assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+ assertThat(sagaEndedEvent.localTxId(), is(globalTxId));
+ assertThat(sagaEndedEvent.parentTxId(), is(nullValue()));
+ assertThat(sagaEndedEvent.serviceName(), is(serviceName));
+ assertThat(sagaEndedEvent.instanceId(), is(notNullValue()));
assertThat(compensatedMessages.isEmpty(), is(true));
}
@@ -146,36 +145,37 @@ public class PackIT {
List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(envelopes.size(), is(8));
- TxEventEnvelope sagaStartedEventEnvelope = envelopes.get(0);
- assertThat(sagaStartedEventEnvelope.type(), is("SagaStartedEvent"));
+ TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+ assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
- TxEventEnvelope txStartedEventEnvelope1 = envelopes.get(1);
- assertThat(txStartedEventEnvelope1.type(), is("TxStartedEvent"));
+ TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+ assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
assertThat(envelopes.get(2).type(), is("TxEndedEvent"));
- TxEventEnvelope txStartedEventEnvelope2 = envelopes.get(3);
- assertThat(txStartedEventEnvelope2.type(), is("TxStartedEvent"));
-
- TxEventEnvelope txAbortedEventEnvelope = envelopes.get(4);
- assertThat(txAbortedEventEnvelope.type(), is("TxAbortedEvent"));
- assertThat(txAbortedEventEnvelope.localTxId(), is(txStartedEventEnvelope2.localTxId()));
- assertThat(txAbortedEventEnvelope.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txAbortedEventEnvelope.serviceName(), is(serviceName));
- assertThat(txAbortedEventEnvelope.instanceId(), is(txStartedEventEnvelope2.instanceId()));
-
- TxEventEnvelope txCompensatedEventEnvelope1 = envelopes.get(5);
- assertThat(txCompensatedEventEnvelope1.type(), is("TxCompensatedEvent"));
- assertThat(txCompensatedEventEnvelope1.localTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txCompensatedEventEnvelope1.parentTxId(), is(sagaStartedEventEnvelope.localTxId()));
- assertThat(txCompensatedEventEnvelope1.serviceName(), is(serviceName));
- assertThat(txCompensatedEventEnvelope1.instanceId(), is(txStartedEventEnvelope1.instanceId()));
-
- TxEventEnvelope txCompensatedEventEnvelope2 = envelopes.get(6);
- assertThat(txCompensatedEventEnvelope2.type(), is("TxCompensatedEvent"));
- assertThat(txCompensatedEventEnvelope2.localTxId(), is(txStartedEventEnvelope2.localTxId()));
- assertThat(txCompensatedEventEnvelope2.parentTxId(), is(txStartedEventEnvelope1.localTxId()));
- assertThat(txCompensatedEventEnvelope2.serviceName(), is(serviceName));
- assertThat(txCompensatedEventEnvelope2.instanceId(), is(txStartedEventEnvelope2.instanceId()));
+ TxEventEnvelope txStartedEvent2 = envelopes.get(3);
+ assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+
+ TxEventEnvelope txAbortedEvent = envelopes.get(4);
+ assertThat(txAbortedEvent.type(), is("TxAbortedEvent"));
+ assertThat(txAbortedEvent.localTxId(), is(txStartedEvent2.localTxId()));
+ assertThat(txAbortedEvent.parentTxId(), is(globalTxId));
+ assertThat(txAbortedEvent.serviceName(), is(serviceName));
+ assertThat(txAbortedEvent.instanceId(), is(txStartedEvent2.instanceId()));
+
+ // TODO: 2018/1/9 compensation shall be done in reverse order
+ TxEventEnvelope txCompensatedEvent1 = envelopes.get(5);
+ assertThat(txCompensatedEvent1.type(), is("TxCompensatedEvent"));
+ assertThat(txCompensatedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+ assertThat(txCompensatedEvent1.parentTxId(), is(globalTxId));
+ assertThat(txCompensatedEvent1.serviceName(), is(serviceName));
+ assertThat(txCompensatedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
+
+ TxEventEnvelope txCompensatedEvent2 = envelopes.get(6);
+ assertThat(txCompensatedEvent2.type(), is("TxCompensatedEvent"));
+ assertThat(txCompensatedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+ assertThat(txCompensatedEvent2.parentTxId(), is(globalTxId));
+ assertThat(txCompensatedEvent2.serviceName(), is(serviceName));
+ assertThat(txCompensatedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
assertThat(envelopes.get(7).type(), is("SagaEndedEvent"));
@@ -183,4 +183,50 @@ public class PackIT {
"Goodbye, " + TRESPASSER,
"My bad, please take the window instead, " + TRESPASSER));
}
+
+ @Test(timeout = 5000)
+ public void updatesEmbeddedTxStateToAlpha() throws Exception {
+ ResponseEntity<String> entity = restTemplate.getForEntity("/goodMorning?name={name}",
+ String.class,
+ "mike");
+
+ assertThat(entity.getStatusCode(), is(OK));
+ assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
+
+ List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+ assertThat(distinctGlobalTxIds.size(), is(1));
+
+ String globalTxId = distinctGlobalTxIds.get(0);
+ List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+
+ assertThat(envelopes.size(), is(6));
+
+ TxEventEnvelope sagaStartedEvent = envelopes.get(0);
+ assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
+
+ TxEventEnvelope txStartedEvent1 = envelopes.get(1);
+ assertThat(txStartedEvent1.type(), is("TxStartedEvent"));
+ assertThat(txStartedEvent1.localTxId(), is(notNullValue()));
+ assertThat(txStartedEvent1.parentTxId(), is(globalTxId));
+
+ TxEventEnvelope txStartedEvent2 = envelopes.get(2);
+ assertThat(txStartedEvent2.type(), is("TxStartedEvent"));
+ assertThat(txStartedEvent2.localTxId(), is(notNullValue()));
+ assertThat(txStartedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+ TxEventEnvelope txEndedEvent2 = envelopes.get(3);
+ assertThat(txEndedEvent2.type(), is("TxEndedEvent"));
+ assertThat(txEndedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
+ assertThat(txEndedEvent2.parentTxId(), is(txStartedEvent1.localTxId()));
+
+ TxEventEnvelope txEndedEvent1 = envelopes.get(4);
+ assertThat(txEndedEvent1.type(), is("TxEndedEvent"));
+ assertThat(txEndedEvent1.localTxId(), is(txStartedEvent1.localTxId()));
+ assertThat(txEndedEvent1.parentTxId(), is(globalTxId));
+
+ TxEventEnvelope sagaEndedEvent = envelopes.get(5);
+ assertThat(sagaEndedEvent.type(), is("SagaEndedEvent"));
+
+ assertThat(compensatedMessages.isEmpty(), is(true));
+ }
}
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index dc88bbd..5358db5 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -22,10 +22,12 @@ import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.CompensationMessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartAspect;
import org.apache.servicecomb.saga.omega.transaction.TransactionAspect;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
+import org.springframework.core.annotation.Order;
@Configuration
@EnableAspectJAutoProxy
@@ -36,6 +38,13 @@ public class TransactionAspectConfig {
return new CompensationMessageHandler(sender, context);
}
+ @Order(0)
+ @Bean
+ SagaStartAspect sagaStartAspect(MessageSender sender, OmegaContext context) {
+ return new SagaStartAspect(sender, context);
+ }
+
+ @Order(1)
@Bean
TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
return new TransactionAspect(sender, context);
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 3dc8c73..c30953f 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
@@ -35,8 +36,9 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.servicecomb.saga.omega.context.CompensationContext;
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
@@ -50,6 +52,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
@@ -68,9 +71,11 @@ import akka.actor.Props;
@SpringBootTest(classes = {TransactionTestMain.class, MessageConfig.class})
@AutoConfigureMockMvc
public class TransactionInterceptionTest {
+ @SuppressWarnings("unchecked")
+ private static final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
private static final String globalTxId = UUID.randomUUID().toString();
- private final String localTxId = UUID.randomUUID().toString();
- private final String parentTxId = UUID.randomUUID().toString();
+ private final String newLocalTxId = UUID.randomUUID().toString();
+ private final String anotherLocalTxId = UUID.randomUUID().toString();
private final String username = uniquify("username");
private final String email = uniquify("email");
@@ -102,9 +107,9 @@ public class TransactionInterceptionTest {
@Before
public void setUp() throws Exception {
+ when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
omegaContext.setGlobalTxId(globalTxId);
- omegaContext.setLocalTxId(localTxId);
- omegaContext.setParentTxId(parentTxId);
+ omegaContext.setLocalTxId(globalTxId);
compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
}
@@ -125,8 +130,8 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -146,8 +151,8 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, illegalUser).toString(),
- new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(),
+ new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
}
@@ -160,20 +165,20 @@ public class TransactionInterceptionTest {
String localTxId = omegaContext.newLocalTxId();
User anotherUser = userService.add(jack);
- messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
- messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
+ messageHandler.onReceive(globalTxId, newLocalTxId, globalTxId, compensationMethod, user);
+ messageHandler.onReceive(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser);
assertThat(userRepository.findOne(user.id()), is(nullValue()));
assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
- new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
- new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(),
+ new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
+ new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+ new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
},
toArray(messages)
);
@@ -184,16 +189,16 @@ public class TransactionInterceptionTest {
new Thread(() -> userService.add(user)).start();
waitTillSavedUser(username);
- String newLocalTxId = omegaContext.newLocalTxId();
+ String localTxId = omegaContext.newLocalTxId();
new Thread(() -> userService.add(jack)).start();
waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
- new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
}
@@ -203,16 +208,16 @@ public class TransactionInterceptionTest {
executor.schedule(() -> userService.add(user), 0, MILLISECONDS);
waitTillSavedUser(username);
- String newLocalTxId = omegaContext.newLocalTxId();
+ String localTxId = omegaContext.newLocalTxId();
executor.invokeAll(singletonList(() -> userService.add(jack)));
waitTillSavedUser(usernameJack);
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod, jack).toString(),
- new TxEndedEvent(globalTxId, newLocalTxId, parentTxId, compensationMethod).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+ new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
}
@@ -231,8 +236,8 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
}
@@ -249,8 +254,8 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
- new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -289,8 +294,13 @@ public class TransactionInterceptionTest {
private final List<String> messages = new ArrayList<>();
@Bean
+ CompensationContext compensationContext() {
+ return new CompensationContext();
+ }
+
+ @Bean
OmegaContext omegaContext() {
- return new OmegaContext(new UniqueIdGenerator());
+ return new OmegaContext(idGenerator);
}
@Bean
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index e92ebb1..64f47ce 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -19,10 +19,9 @@ package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
-public class SagaStartAnnotationProcessor {
+class SagaStartAnnotationProcessor {
private final OmegaContext omegaContext;
-
private final MessageSender sender;
SagaStartAnnotationProcessor(OmegaContext omegaContext, MessageSender sender) {
@@ -31,7 +30,7 @@ public class SagaStartAnnotationProcessor {
}
void preIntercept() {
- String globalTxId = omegaContext.newGlobalTxId();
+ String globalTxId = globalTxId();
// reuse the globalTxId as localTxId to differ localTxId in SagaStartedEvent and the first TxStartedEvent
sender.send(new SagaStartedEvent(globalTxId, globalTxId));
}
@@ -39,5 +38,12 @@ public class SagaStartAnnotationProcessor {
void postIntercept() {
String globalTxId = omegaContext.globalTxId();
sender.send(new SagaEndedEvent(globalTxId, globalTxId));
+ omegaContext.clear();
+ }
+
+ private String globalTxId() {
+ String globalTxId = omegaContext.newGlobalTxId();
+ omegaContext.setLocalTxId(globalTxId);
+ return globalTxId;
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
new file mode 100644
index 0000000..307af3c
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class SagaStartAspect {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+ private final OmegaContext context;
+
+ public SagaStartAspect(MessageSender sender, OmegaContext context) {
+ this.context = context;
+ this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+ }
+
+ @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
+ Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+ sagaStartAnnotationProcessor.preIntercept();
+ LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
+
+ try {
+ return joinPoint.proceed();
+ } catch (Throwable throwable) {
+ LOG.error("Failed to process SagaStart method: {}", method.toString());
+ throw throwable;
+ } finally {
+ LOG.debug("Transaction with context {} has finished.", context);
+ sagaStartAnnotationProcessor.postIntercept();
+ }
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a39c0d..f62b92e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -36,7 +36,6 @@ public class TransactionAspect {
private final PostTransactionInterceptor postTransactionInterceptor;
private final FailedTransactionInterceptor failedTransactionInterceptor;
- private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
private final OmegaContext context;
public TransactionAspect(MessageSender sender, OmegaContext context) {
@@ -44,7 +43,6 @@ public class TransactionAspect {
this.preTransactionInterceptor = new PreTransactionInterceptor(sender);
this.postTransactionInterceptor = new PostTransactionInterceptor(sender);
this.failedTransactionInterceptor = new FailedTransactionInterceptor(sender);
- this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
@@ -54,7 +52,12 @@ public class TransactionAspect {
String signature = compensationMethodSignature(joinPoint, compensable, method);
+ String localTxId = context.localTxId();
+ String parentTxId = context.parentTxId();
+ context.setParentTxId(localTxId);
+
preIntercept(joinPoint, signature);
+ LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
try {
Object result = joinPoint.proceed();
@@ -64,26 +67,10 @@ public class TransactionAspect {
} catch (Throwable throwable) {
interceptException(signature, throwable);
throw throwable;
- }
- }
-
- @Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.SagaStart * *(..))")
- Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
- Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
- LOG.debug("Initializing global tx id before execution of method {}", method.toString());
- sagaStartAnnotationProcessor.preIntercept();
-
- try {
- Object result = joinPoint.proceed();
-
- LOG.info("Transaction {} succeeded.", context.globalTxId());
- sagaStartAnnotationProcessor.postIntercept();
-
- return result;
- } catch (Throwable throwable) {
- LOG.error("Transaction {} failed.", context.globalTxId());
- throw throwable;
+ } finally {
+ context.setLocalTxId(localTxId);
+ context.setParentTxId(parentTxId);
+ LOG.debug("Restored context back to {}", context);
}
}
@@ -97,12 +84,9 @@ public class TransactionAspect {
}
private void preIntercept(ProceedingJoinPoint joinPoint, String signature) {
- // context without a parent should be the first TxStartedEvent
- initFirstOmegaContext();
-
preTransactionInterceptor.intercept(
context.globalTxId(),
- context.localTxId(),
+ context.newLocalTxId(),
context.parentTxId(),
signature,
joinPoint.getArgs());
@@ -124,14 +108,4 @@ public class TransactionAspect {
signature,
throwable);
}
-
- private void initFirstOmegaContext() {
- if (context.parentTxId() != null) {
- return;
- }
- if (context.localTxId() == null) {
- context.newLocalTxId();
- }
- context.setParentTxId(context.globalTxId());
- }
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 8913b7f..fba7826 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -41,9 +41,9 @@ public class SagaStartAnnotationProcessorTest {
private final String localTxId = UUID.randomUUID().toString();
- private final IdGenerator generator = mock(IdGenerator.class);
-
@SuppressWarnings("unchecked")
+ private final IdGenerator<String> generator = mock(IdGenerator.class);
+
private final OmegaContext context = new OmegaContext(generator);
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
@@ -56,7 +56,7 @@ public class SagaStartAnnotationProcessorTest {
sagaStartAnnotationProcessor.preIntercept();
assertThat(context.globalTxId(), is(globalTxId));
- assertThat(context.localTxId(), is(nullValue()));
+ assertThat(context.localTxId(), is(globalTxId));
assertThat(context.parentTxId(), is(nullValue()));
TxEvent event = messages.get(0);
@@ -73,7 +73,6 @@ public class SagaStartAnnotationProcessorTest {
public void sendsSagaEndedEvent() {
context.clear();
context.setGlobalTxId(globalTxId);
- context.setLocalTxId(localTxId);
sagaStartAnnotationProcessor.postIntercept();
@@ -85,5 +84,9 @@ public class SagaStartAnnotationProcessorTest {
assertThat(event.compensationMethod().isEmpty(), is(true));
assertThat(event.type(), is("SagaEndedEvent"));
assertThat(event.payloads().length, is(0));
+
+ assertThat(context.globalTxId(), is(nullValue()));
+ assertThat(context.localTxId(), is(nullValue()));
+ assertThat(context.parentTxId(), is(nullValue()));
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
new file mode 100644
index 0000000..cfaa7b6
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SagaStartAspectTest {
+ private final List<TxEvent> messages = new ArrayList<>();
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+
+ private final MessageSender sender = messages::add;
+ private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+ private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+ @SuppressWarnings("unchecked")
+ private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+ private final Compensable compensable = Mockito.mock(Compensable.class);
+
+ private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+ private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
+
+ @Before
+ public void setUp() throws Exception {
+ when(joinPoint.getSignature()).thenReturn(methodSignature);
+ when(joinPoint.getTarget()).thenReturn(this);
+
+ when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+ when(compensable.compensationMethod()).thenReturn("doNothing");
+
+ omegaContext.setGlobalTxId(globalTxId);
+ omegaContext.setLocalTxId(localTxId);
+ omegaContext.setParentTxId(parentTxId);
+ }
+
+ @Test
+ public void newGlobalTxIdInSagaStart() throws Throwable {
+ omegaContext.clear();
+ when(idGenerator.nextId()).thenReturn(globalTxId);
+
+ aspect.advise(joinPoint);
+
+ TxEvent startedEvent = messages.get(0);
+
+ assertThat(startedEvent.globalTxId(), is(globalTxId));
+ assertThat(startedEvent.localTxId(), is(globalTxId));
+ assertThat(startedEvent.parentTxId(), is(nullValue()));
+ assertThat(startedEvent.type(), is("SagaStartedEvent"));
+
+ TxEvent endedEvent = messages.get(1);
+
+ assertThat(endedEvent.globalTxId(), is(globalTxId));
+ assertThat(endedEvent.localTxId(), is(globalTxId));
+ assertThat(endedEvent.parentTxId(), is(nullValue()));
+ assertThat(endedEvent.type(), is("SagaEndedEvent"));
+
+ assertThat(omegaContext.globalTxId(), is(nullValue()));
+ assertThat(omegaContext.localTxId(), is(nullValue()));
+ assertThat(omegaContext.parentTxId(), is(nullValue()));
+ }
+
+ @Test
+ public void clearContextOnSagaStartError() throws Throwable {
+ when(idGenerator.nextId()).thenReturn(globalTxId);
+ RuntimeException oops = new RuntimeException("oops");
+
+ when(joinPoint.proceed()).thenThrow(oops);
+
+ try {
+ aspect.advise(joinPoint);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException e) {
+ assertThat(e, is(oops));
+ }
+
+ TxEvent event = messages.get(1);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(globalTxId));
+ assertThat(event.parentTxId(), is(nullValue()));
+ assertThat(event.type(), is("SagaEndedEvent"));
+
+ assertThat(omegaContext.globalTxId(), is(nullValue()));
+ assertThat(omegaContext.localTxId(), is(nullValue()));
+ assertThat(omegaContext.parentTxId(), is(nullValue()));
+ }
+
+ private String doNothing() {
+ return "doNothing";
+ }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
new file mode 100644
index 0000000..bd8829c
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.saga.omega.context.IdGenerator;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TransactionAspectTest {
+ private final List<TxEvent> messages = new ArrayList<>();
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+
+ private final String newLocalTxId = UUID.randomUUID().toString();
+
+ private final MessageSender sender = messages::add;
+ private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+ private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+ @SuppressWarnings("unchecked")
+ private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+ private final Compensable compensable = Mockito.mock(Compensable.class);
+
+ private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+ private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+
+ @Before
+ public void setUp() throws Exception {
+ when(joinPoint.getSignature()).thenReturn(methodSignature);
+ when(joinPoint.getTarget()).thenReturn(this);
+
+ when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+ when(compensable.compensationMethod()).thenReturn("doNothing");
+
+ omegaContext.setGlobalTxId(globalTxId);
+ omegaContext.setLocalTxId(localTxId);
+ omegaContext.setParentTxId(parentTxId);
+ }
+
+ @Test
+ public void newLocalTxIdInCompensable() throws Throwable {
+ when(idGenerator.nextId()).thenReturn(newLocalTxId);
+
+ aspect.advise(joinPoint, compensable);
+
+ TxEvent startedEvent = messages.get(0);
+
+ assertThat(startedEvent.globalTxId(), is(globalTxId));
+ assertThat(startedEvent.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent.parentTxId(), is(localTxId));
+ assertThat(startedEvent.type(), is("TxStartedEvent"));
+
+ TxEvent endedEvent = messages.get(1);
+
+ assertThat(endedEvent.globalTxId(), is(globalTxId));
+ assertThat(endedEvent.localTxId(), is(newLocalTxId));
+ assertThat(endedEvent.parentTxId(), is(localTxId));
+ assertThat(endedEvent.type(), is("TxEndedEvent"));
+
+ assertThat(omegaContext.globalTxId(), is(globalTxId));
+ assertThat(omegaContext.localTxId(), is(localTxId));
+ assertThat(omegaContext.parentTxId(), is(parentTxId));
+ }
+
+ @Test
+ public void restoreContextOnCompensableError() throws Throwable {
+ when(idGenerator.nextId()).thenReturn(newLocalTxId);
+ RuntimeException oops = new RuntimeException("oops");
+
+ when(joinPoint.proceed()).thenThrow(oops);
+
+ try {
+ aspect.advise(joinPoint, compensable);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException e) {
+ assertThat(e, is(oops));
+ }
+
+ TxEvent event = messages.get(1);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(newLocalTxId));
+ assertThat(event.parentTxId(), is(localTxId));
+ assertThat(event.type(), is("TxAbortedEvent"));
+
+ assertThat(omegaContext.globalTxId(), is(globalTxId));
+ assertThat(omegaContext.localTxId(), is(localTxId));
+ assertThat(omegaContext.parentTxId(), is(parentTxId));
+ }
+
+ private String doNothing() {
+ return "doNothing";
+ }
+}
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index 3e5d620..9b003f8 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -20,6 +20,9 @@
package org.apache.servicecomb.saga.omega.transport.resttemplate;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY;
+import static org.apache.servicecomb.saga.omega.context.OmegaContext.LOCAL_TX_ID_KEY;
+
import java.lang.invoke.MethodHandles;
import javax.servlet.http.HttpServletRequest;
@@ -43,24 +46,21 @@ class TransactionHandlerInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
- String globalTxId = request.getHeader(OmegaContext.GLOBAL_TX_ID_KEY);
+ String globalTxId = request.getHeader(GLOBAL_TX_ID_KEY);
if (globalTxId == null) {
- LOG.debug("no such header: {}", OmegaContext.GLOBAL_TX_ID_KEY);
+ LOG.debug("no such header: {}", GLOBAL_TX_ID_KEY);
} else {
omegaContext.setGlobalTxId(globalTxId);
- omegaContext.newLocalTxId();
- omegaContext.setParentTxId(request.getHeader(OmegaContext.LOCAL_TX_ID_KEY));
+ omegaContext.setLocalTxId(request.getHeader(LOCAL_TX_ID_KEY));
}
return true;
}
@Override
- public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o,
- ModelAndView modelAndView) {
+ public void postHandle(HttpServletRequest request, HttpServletResponse response, Object o, ModelAndView mv) {
}
@Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o,
- Exception e) {
+ public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o, Exception e) {
}
}
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
index 6ee816b..03f278f 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/test/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptorTest.java
@@ -51,9 +51,7 @@ public class TransactionHandlerInterceptorTest {
@Before
public void setUp() {
- omegaContext.setGlobalTxId(null);
- omegaContext.setLocalTxId(null);
- omegaContext.setParentTxId(null);
+ omegaContext.clear();
}
@Test
@@ -64,8 +62,8 @@ public class TransactionHandlerInterceptorTest {
requestInterceptor.preHandle(request, response, null);
assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(notNullValue()));
- assertThat(omegaContext.parentTxId(), is(localTxId));
+ assertThat(omegaContext.localTxId(), is(localTxId));
+ assertThat(omegaContext.parentTxId(), is(nullValue()));
}
@Test
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.