You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:19 UTC
[incubator-servicecomb-saga] 09/09: SCB-212 compensated on
TxEndedEvent immediately if global TX already failed, in case of timeout
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit b878c80ac09942f8242139c431cd616e80c04ffa
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800
SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 26 +++++++++++++---
.../saga/alpha/core/TxEventRepository.java | 2 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 35 +++++++++++++++++-----
.../saga/alpha/server/SpringTxEventRepository.java | 2 +-
4 files changed, 51 insertions(+), 14 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839f..662fb54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,10 @@
package org.apache.servicecomb.saga.alpha.core;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,8 +33,7 @@ public class TxConsistentService {
private final TxEventRepository eventRepository;
private final OmegaCallback omegaCallback;
private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
- put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
- put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+ put(TxAbortedEvent.name(), (event) -> compensate(event));
}};
public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -40,11 +43,26 @@ public class TxConsistentService {
public void handle(TxEvent event) {
eventRepository.save(event);
- CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+ CompletableFuture.runAsync(() -> {
+ if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+ omegaCallback.compensate(event);
+ }
+
+ eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+ });
}
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+ List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
events.forEach(omegaCallback::compensate);
}
+
+ private boolean isGlobalTxAborted(TxEvent event) {
+ return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+ }
+
+ private boolean isTxEndedEvent(TxEvent event) {
+ return TxEndedEvent.name().equals(event.type());
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
public interface TxEventRepository {
void save(TxEvent event);
- List<TxEvent> findStartedTransactions(String globalTxId, String type);
+ List<TxEvent> findTransactions(String globalTxId, String type);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 6443997..30ec2a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,9 @@ package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -44,7 +47,7 @@ public class TxConsistentServiceTest {
}
@Override
- public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+ public List<TxEvent> findTransactions(String globalTxId, String type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
.collect(Collectors.toList());
@@ -69,8 +72,8 @@ public class TxConsistentServiceTest {
public void persistEventOnArrival() throws Exception {
TxEvent[] events = {
newEvent(EventType.SagaStartedEvent),
- newEvent(EventType.TxStartedEvent),
- newEvent(EventType.TxEndedEvent),
+ newEvent(TxStartedEvent),
+ newEvent(TxEndedEvent),
newEvent(EventType.TxCompensatedEvent),
newEvent(EventType.SagaEndedEvent)};
@@ -85,14 +88,14 @@ public class TxConsistentServiceTest {
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
String localTxId1 = UUID.randomUUID().toString();
- events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
- events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+ events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
String localTxId2 = UUID.randomUUID().toString();
- events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
- events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+ events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
- TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+ TxEvent abortEvent = newEvent(TxAbortedEvent);
consistentService.handle(abortEvent);
@@ -103,6 +106,22 @@ public class TxConsistentServiceTest {
));
}
+ @Test
+ public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+ String localTxId1 = UUID.randomUUID().toString();
+ events.add(newEvent(TxStartedEvent));
+ events.add(newEvent(TxAbortedEvent));
+
+ TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+ consistentService.handle(event);
+
+ await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+ assertThat(compensationContexts, containsInAnyOrder(
+ new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+ ));
+ }
+
private TxEvent newEvent(EventType eventType) {
return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+ public List<TxEvent> findTransactions(String globalTxId, String type) {
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
.stream()
.map(TxEventEnvelope::event)
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.