You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 06:56:05 UTC

[incubator-servicecomb-saga] 01/04: SCB-213 proper timing to handle SagaEndedEvent in asynchronous situation

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 1f5a128cccc13d92208be6868a379fa5b3eb3ce2
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 10 16:00:45 2018 +0800

    SCB-213 proper timing to handle SagaEndedEvent in asynchronous situation
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 45 ++++++++++++++++++++--
 .../saga/alpha/core/TxConsistentServiceTest.java   | 35 ++++++++++++-----
 .../saga/omega/transaction/TransactionAspect.java  | 12 +++---
 3 files changed, 74 insertions(+), 18 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839f..1805335 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,22 +17,36 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.Consumer;
 
 public class TxConsistentService {
   private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
 
+  private static final byte[] EMPTY_PAYLOAD = new byte[0];
+
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
-    put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
+  private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
     this.eventRepository = eventRepository;
     this.omegaCallback = omegaCallback;
@@ -44,7 +58,32 @@ public class TxConsistentService {
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
+    eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
+      CopyOnWriteArraySet<String> eventSet = new CopyOnWriteArraySet<>();
+      events.forEach(e -> eventSet.add(getUniqueEventId(e)));
+      return eventSet;
+    });
+  }
+
+  private void updateCompensateStatus(TxEvent event) {
+    Set<String> events = eventsToCompensate.get(event.globalTxId());
+    if (events != null) {
+      events.remove(getUniqueEventId(event));
+      if (events.isEmpty()) {
+        markGlobalTxEnd(event);
+      }
+    }
+  }
+
+  private String getUniqueEventId(TxEvent event) {
+    return event.globalTxId() + "_" + event.localTxId();
+  }
+
+  private void markGlobalTxEnd(TxEvent event) {
+    eventRepository.save(new TxEvent(
+        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
+        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
   }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 6443997..ad448da 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -68,11 +74,11 @@ public class TxConsistentServiceTest {
   @Test
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
-        newEvent(EventType.SagaStartedEvent),
-        newEvent(EventType.TxStartedEvent),
-        newEvent(EventType.TxEndedEvent),
-        newEvent(EventType.TxCompensatedEvent),
-        newEvent(EventType.SagaEndedEvent)};
+        newEvent(SagaStartedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
+        newEvent(TxCompensatedEvent),
+        newEvent(SagaEndedEvent)};
 
     for (TxEvent event : events) {
       consistentService.handle(event);
@@ -85,14 +91,14 @@ public class TxConsistentServiceTest {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
 
     String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
 
-    TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
@@ -101,6 +107,15 @@ public class TxConsistentServiceTest {
         new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
         new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
     ));
+
+    TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
+    consistentService.handle(compensateEvent2);
+
+    TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
+    consistentService.handle(compensateEvent1);
+
+    assertThat(events.size(), is(8));
+    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
   }
 
   private TxEvent newEvent(EventType eventType) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index bf13e3a..abafc2f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -75,13 +75,15 @@ public class TransactionAspect {
     sagaStartAnnotationProcessor.preIntercept();
 
     try {
-      return joinPoint.proceed();
+      Object result = joinPoint.proceed();
+
+      LOG.info("Transaction {} succeeded.", context.globalTxId());
+      sagaStartAnnotationProcessor.postIntercept();
+
+      return result;
     } catch (Throwable throwable) {
-      LOG.error("Failed to process SagaStart method: {}", method.toString());
+      LOG.error("Transaction {} failed.", context.globalTxId());
       throw throwable;
-    } finally {
-      LOG.debug("Transaction {} has finished.", context.globalTxId());
-      sagaStartAnnotationProcessor.postIntercept();
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.