You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/11 06:56:06 UTC

[GitHub] seanyinx closed pull request #111: SCB-213 proper timing to handle SagaEndedEvent in asynchronous situation

seanyinx closed pull request #111: SCB-213 proper timing to handle SagaEndedEvent in asynchronous situation
URL: https://github.com/apache/incubator-servicecomb-saga/pull/111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839ff..5bab1d8b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,22 +17,36 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 public class TxConsistentService {
   private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
 
+  private static final byte[] EMPTY_PAYLOAD = new byte[0];
+
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
-    put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
+    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
+  private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
     this.eventRepository = eventRepository;
     this.omegaCallback = omegaCallback;
@@ -44,7 +58,29 @@ public void handle(TxEvent event) {
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+    List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
     events.forEach(omegaCallback::compensate);
+    eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
+      Set<String> eventSet = new HashSet<>(events.size());
+      events.forEach(e -> eventSet.add(e.localTxId()));
+      return eventSet;
+    });
+  }
+
+  private void updateCompensateStatus(TxEvent event) {
+    Set<String> events = eventsToCompensate.get(event.globalTxId());
+    if (events != null) {
+      events.remove(event.localTxId());
+      if (events.isEmpty()) {
+        markGlobalTxEnd(event);
+        eventsToCompensate.remove(event.globalTxId());
+      }
+    }
+  }
+
+  private void markGlobalTxEnd(TxEvent event) {
+    eventRepository.save(new TxEvent(
+        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
+        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
   }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 64439970..cc5c520a 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,12 @@
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -68,11 +74,11 @@ public void save(TxEvent event) {
   @Test
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
-        newEvent(EventType.SagaStartedEvent),
-        newEvent(EventType.TxStartedEvent),
-        newEvent(EventType.TxEndedEvent),
-        newEvent(EventType.TxCompensatedEvent),
-        newEvent(EventType.SagaEndedEvent)};
+        newEvent(SagaStartedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
+        newEvent(TxCompensatedEvent),
+        newEvent(SagaEndedEvent)};
 
     for (TxEvent event : events) {
       consistentService.handle(event);
@@ -85,14 +91,14 @@ public void persistEventOnArrival() throws Exception {
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
     String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
 
     String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
 
-    TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
@@ -101,6 +107,15 @@ public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
         new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
         new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
     ));
+
+    TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
+    consistentService.handle(compensateEvent2);
+
+    TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
+    consistentService.handle(compensateEvent1);
+
+    await().atMost(1, SECONDS).until(() -> events.size() == 8);
+    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
   }
 
   private TxEvent newEvent(EventType eventType) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index bf13e3a0..abafc2f1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -75,13 +75,15 @@ Object advise(ProceedingJoinPoint joinPoint) throws Throwable {
     sagaStartAnnotationProcessor.preIntercept();
 
     try {
-      return joinPoint.proceed();
+      Object result = joinPoint.proceed();
+
+      LOG.info("Transaction {} succeeded.", context.globalTxId());
+      sagaStartAnnotationProcessor.postIntercept();
+
+      return result;
     } catch (Throwable throwable) {
-      LOG.error("Failed to process SagaStart method: {}", method.toString());
+      LOG.error("Transaction {} failed.", context.globalTxId());
       throw throwable;
-    } finally {
-      LOG.debug("Transaction {} has finished.", context.globalTxId());
-      sagaStartAnnotationProcessor.postIntercept();
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services