You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 07:20:22 UTC
[incubator-servicecomb-saga] 09/13: SCB-212 compensated on
TxEndedEvent immediately if global TX already failed, in case of timeout
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit fdfbc6c089a3b3064bc3bc07d62d4058b248eacd
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800
SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 20 +++++++++++++++++---
.../saga/alpha/core/TxEventRepository.java | 2 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 18 +++++++++++++++++-
.../saga/alpha/server/SpringTxEventRepository.java | 2 +-
4 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5bab1d8..079e559 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -40,7 +40,6 @@ public class TxConsistentService {
private final TxEventRepository eventRepository;
private final OmegaCallback omegaCallback;
private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
- put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
put(TxAbortedEvent.name(), (event) -> compensate(event));
put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
}};
@@ -54,11 +53,18 @@ public class TxConsistentService {
public void handle(TxEvent event) {
eventRepository.save(event);
- CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+ CompletableFuture.runAsync(() -> {
+ if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+ omegaCallback.compensate(event);
+ }
+
+ eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+ });
}
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
+ List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
events.forEach(omegaCallback::compensate);
eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
Set<String> eventSet = new HashSet<>(events.size());
@@ -83,4 +89,12 @@ public class TxConsistentService {
event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
}
+
+ private boolean isGlobalTxAborted(TxEvent event) {
+ return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+ }
+
+ private boolean isTxEndedEvent(TxEvent event) {
+ return TxEndedEvent.name().equals(event.type());
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
public interface TxEventRepository {
void save(TxEvent event);
- List<TxEvent> findStartedTransactions(String globalTxId, String type);
+ List<TxEvent> findTransactions(String globalTxId, String type);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index cc5c520..febbfaf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
}
@Override
- public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+ public List<TxEvent> findTransactions(String globalTxId, String type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
.collect(Collectors.toList());
@@ -118,6 +118,22 @@ public class TxConsistentServiceTest {
assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
}
+ @Test
+ public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+ String localTxId1 = UUID.randomUUID().toString();
+ events.add(newEvent(TxStartedEvent));
+ events.add(newEvent(TxAbortedEvent));
+
+ TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+ consistentService.handle(event);
+
+ await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+ assertThat(compensationContexts, containsInAnyOrder(
+ new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+ ));
+ }
+
private TxEvent newEvent(EventType eventType) {
return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+ public List<TxEvent> findTransactions(String globalTxId, String type) {
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
.stream()
.map(TxEventEnvelope::event)
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.