You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/13 14:58:00 UTC

[incubator-servicecomb-saga] 09/15: SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 50444100ddc4d906b010eefa42d974bb219592ff
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 11 10:07:47 2018 +0800

    SCB-212 compensated on TxEndedEvent immediately if global TX already failed, in case of timeout
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java         | 20 +++++++++++++++++---
 .../saga/alpha/core/TxEventRepository.java           |  2 +-
 .../saga/alpha/core/TxConsistentServiceTest.java     | 18 +++++++++++++++++-
 .../saga/alpha/server/SpringTxEventRepository.java   |  2 +-
 4 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5bab1d8..079e559 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -40,7 +40,6 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
     put(TxAbortedEvent.name(), (event) -> compensate(event));
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
@@ -54,11 +53,18 @@ public class TxConsistentService {
 
   public void handle(TxEvent event) {
     eventRepository.save(event);
-    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+
+    CompletableFuture.runAsync(() -> {
+      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
+        omegaCallback.compensate(event);
+      }
+
+      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
+    });
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
     eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
       Set<String> eventSet = new HashSet<>(events.size());
@@ -83,4 +89,12 @@ public class TxConsistentService {
         event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
         null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
   }
+
+  private boolean isGlobalTxAborted(TxEvent event) {
+    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  }
+
+  private boolean isTxEndedEvent(TxEvent event) {
+    return TxEndedEvent.name().equals(event.type());
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a90e02..3a8387b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -22,5 +22,5 @@ import java.util.List;
 public interface TxEventRepository {
   void save(TxEvent event);
 
-  List<TxEvent> findStartedTransactions(String globalTxId, String type);
+  List<TxEvent> findTransactions(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index cc5c520..febbfaf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -50,7 +50,7 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+    public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
@@ -118,6 +118,22 @@ public class TxConsistentServiceTest {
     assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
   }
 
+  @Test
+  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+    ));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index a8058e9..3bf6e03 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -36,7 +36,7 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public List<TxEvent> findStartedTransactions(String globalTxId, String type) {
+  public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
         .stream()
         .map(TxEventEnvelope::event)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.