You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:49:19 UTC

[incubator-servicecomb-saga] 09/09: SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-212_tx_timeout
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b878c80ac09942f8242139c431cd616e80c04ffa
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800

    SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 26 +++++++++++++---
 .../saga/alpha/core/TxEventRepository.java         |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 35 +++++++++++++++++-----
 .../saga/alpha/server/SpringTxEventRepository.java |  2 +-
 4 files changed, 51 insertions(+), 14 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839f..662fb54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,10 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,8 +33,7 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
-    put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
   }};
 
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -40,11 +43,26 @@ public class TxConsistentService {
 
   public void handle(TxEvent event) {
     eventRepository.save(event);
-    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+    CompletableFuture.runAsync(() -> {
+      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+        omegaCallback.compensate(event);
+      }
+
+      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+    });
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
   }
+
+  private boolean isGlobalTxAborted(TxEvent event) {
+    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  }
+
+  private boolean isTxEndedEvent(TxEvent event) {
+    return TxEndedEvent.name().equals(event.type());
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
 public interface TxEventRepository {
   void save(TxEvent event);
 
-  List<TxEvent> findStartedTransactions(String globalTxId, String type);
+  List<TxEvent> findTransactions(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 6443997..30ec2a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,9 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -44,7 +47,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+    public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
@@ -69,8 +72,8 @@ public class TxConsistentServiceTest {
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
         newEvent(EventType.SagaStartedEvent),
-        newEvent(EventType.TxStartedEvent),
-        newEvent(EventType.TxEndedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
         newEvent(EventType.TxCompensatedEvent),
         newEvent(EventType.SagaEndedEvent)};
 
@@ -85,14 +88,14 @@ public class TxConsistentServiceTest {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
 
     String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
 
-    TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
@@ -103,6 +106,22 @@ public class TxConsistentServiceTest {
     ));
   }
 
+  @Test
+  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+    ));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+  public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
         .stream()
         .map(TxEventEnvelope::event)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.