You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/28 07:46:25 UTC
[incubator-servicecomb-saga] branch master updated: SCB-140
compensated only distinct events SCB-141 supported multiple sub tx within
the same global tx on a single service
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new 1f79ce0 SCB-140 compensated only distinct events SCB-141 supported multiple sub tx within the same global tx on a single service
1f79ce0 is described below
commit 1f79ce08db38f496ee06da5e17f1e0504c4c0ee3
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Dec 28 10:28:16 2017 +0800
SCB-140 compensated only distinct events
SCB-141 supported multiple sub tx within the same global tx on a single service
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 3 +-
.../saga/alpha/core/TxEventRepository.java | 2 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 6 ++--
.../saga/alpha/server/SpringTxEventRepository.java | 2 +-
.../saga/alpha/server/TxEventEnvelope.java | 6 ++++
.../alpha/server/TxEventEnvelopeRepository.java | 5 ++++
.../saga/alpha/server/AlphaIntegrationTest.java | 26 ++++++++++++++---
omega/omega-context/pom.xml | 4 +++
.../saga/omega/context/OmegaContext.java | 34 +++++++++++++---------
.../spring/TransactionInterceptionTest.java | 8 +++--
.../saga/omega/transaction/TransactionAspect.java | 1 +
11 files changed, 70 insertions(+), 27 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index af02d74..22605f8 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -18,7 +18,6 @@
package io.servicecomb.saga.alpha.core;
import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
-import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.util.HashMap;
@@ -49,7 +48,7 @@ public class TxConsistentService {
// TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+ List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
}
}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index cb44f77..9eed4ea 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
public interface TxEventRepository {
void save(TxEvent event);
- List<TxEvent> findCompletedEvents(String globalTxId, String type);
+ List<TxEvent> findStartedTransactions(String globalTxId, String type);
}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 64ed62c..d7e66c3 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
}
@Override
- public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
.collect(Collectors.toList());
@@ -85,9 +85,9 @@ public class TxConsistentServiceTest {
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
events.add(eventOf(TxStartedEvent, "service a".getBytes()));
- events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+ events.add(eventOf(TxEndedEvent, new byte[0]));
events.add(eventOf(TxStartedEvent, "service b".getBytes()));
- events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+ events.add(eventOf(TxEndedEvent, new byte[0]));
TxEvent abortEvent = newEvent(TxAbortedEvent);
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7b59d29..e8c8058 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
.stream()
.map(TxEventEnvelope::event)
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index 152edfb..b027754 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -17,6 +17,8 @@
package io.servicecomb.saga.alpha.server;
+import java.util.Date;
+
import javax.persistence.Embedded;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
@@ -40,6 +42,10 @@ class TxEventEnvelope {
this.event = event;
}
+ public TxEventEnvelope(String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) {
+ this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, payloads);
+ }
+
public long creationTime() {
return event.creationTime().getTime();
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index cd3cbc7..04ff836 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,10 +19,15 @@ package io.servicecomb.saga.alpha.server;
import java.util.List;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+ @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
+ + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.payloads"
+ + ") FROM TxEventEnvelope t "
+ + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
}
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index e9c9a98..d932072 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -109,9 +109,23 @@ public class AlphaIntegrationTest {
@Test
public void compensateOnFailure() throws Exception {
eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+
+ endpoint.handle(someEvent(TxAbortedEvent));
+
+ await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+ assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+ }
+
+ @Test
+ public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
endpoint.handle(someEvent(TxAbortedEvent));
@@ -136,10 +150,14 @@ public class AlphaIntegrationTest {
}
private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) {
+ return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads);
+ }
+
+ private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) {
return new TxEventEnvelope(new TxEvent(new Date(),
globalTxId,
- UUID.randomUUID().toString(),
- UUID.randomUUID().toString(),
+ localTxId,
+ parentTxId,
eventType.name(),
payloads));
}
diff --git a/omega/omega-context/pom.xml b/omega/omega-context/pom.xml
index 5bba67c..f6cd0c6 100644
--- a/omega/omega-context/pom.xml
+++ b/omega/omega-context/pom.xml
@@ -29,6 +29,10 @@
<artifactId>omega-context</artifactId>
<dependencies>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index d8cca65..1fe8661 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -17,19 +17,24 @@
package io.servicecomb.saga.omega.context;
+import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class OmegaContext {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
private final ThreadLocal<String> globalTxId = new ThreadLocal<>();
private final ThreadLocal<String> localTxId = new ThreadLocal<>();
private final ThreadLocal<String> parentTxId = new ThreadLocal<>();
- private final Map<String, CompensationContext> compensationContexts = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, CompensationContext>> compensationContexts = new ConcurrentHashMap<>();
private final IdGenerator<String> idGenerator;
public OmegaContext(IdGenerator<String> idGenerator) {
@@ -73,8 +78,9 @@ public class OmegaContext {
}
// TODO: 2017/12/23 remove this context entry by the end of its corresponding global tx
- public void addContext(String id, Object target, String compensationMethod, Object... args) {
- compensationContexts.put(id, new CompensationContext(target, compensationMethod, args));
+ public void addContext(String globalTxId, String localTxId, Object target, String compensationMethod, Object... args) {
+ compensationContexts.computeIfAbsent(globalTxId, k -> new ConcurrentHashMap<>())
+ .put(localTxId, new CompensationContext(target, compensationMethod, args));
}
public boolean containsContext(String globalTxId) {
@@ -82,17 +88,17 @@ public class OmegaContext {
}
public void compensate(String globalTxId) {
- CompensationContext compensationContext = compensationContexts.get(globalTxId);
-
- try {
- invokeMethod(compensationContext);
- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- throw new IllegalStateException(
- "Pre-checking for compensate method " + compensationContext.compensationMethod
- + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
- e);
- } finally {
- compensationContexts.remove(globalTxId);
+ Map<String, CompensationContext> contexts = compensationContexts.remove(globalTxId);
+
+ for (CompensationContext compensationContext : contexts.values()) {
+ try {
+ invokeMethod(compensationContext);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ LOG.error(
+ "Pre-checking for compensate method " + compensationContext.compensationMethod
+ + " was somehow skipped, did you forget to configure compensable method checking on service startup?",
+ e);
+ }
}
}
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 67fdff9..83e57b0 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -105,10 +105,14 @@ public class TransactionInterceptionTest {
public void compensateOnTransactionException() throws Exception {
User user = userService.add(new User(username, email));
+ // another sub transaction to the same service within the same global transaction
+ omegaContext.newLocalTxId();
+ User anotherUser = userService.add(new User(uniquify("Jack"), uniquify("jack@gmail.com")));
+
messageHandler.onReceive("to be compensated".getBytes());
- User actual = userRepository.findOne(user.id());
- assertThat(actual, is(nullValue()));
+ assertThat(userRepository.findOne(user.id()), is(nullValue()));
+ assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
assertThat(omegaContext.containsContext(globalTxId), is(false));
}
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
index f78ed31..ecce0ee 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,6 +49,7 @@ public class TransactionAspect {
LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
context.addContext(context.globalTxId(),
+ context.localTxId(),
joinPoint.getTarget(),
compensable.compensationMethod(),
joinPoint.getArgs());
--
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].