You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:21 UTC

[incubator-servicecomb-saga] 09/12: SCB-218 made sure saga ended event is always the last event

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ffaa350c987ec77c43209cf990d6f9e2bcab2560
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 17:01:45 2018 +0800

    SCB-218 made sure saga ended event is always the last event
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/EventScanner.java  |  3 +--
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 29 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index f9fa3be..80deeb3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -100,8 +100,6 @@ public class EventScanner implements Runnable {
     }
   }
 
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
     log.info("Transaction with globalTxId {} and localTxId {} was compensated",
@@ -135,6 +133,7 @@ public class EventScanner implements Runnable {
         EMPTY_PAYLOAD);
   }
 
+  // TODO: 2018/1/19 potentially compensation may be out of order if we don't wait till received compensated event for the previous one, since compensation is async
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 769ee5a..35352f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -69,7 +69,7 @@ class AlphaConfig {
 
   @Bean
   ScheduledExecutorService compensationScheduler() {
-    return Executors.newScheduledThreadPool(2);
+    return Executors.newScheduledThreadPool(1);
   }
 
   @Bean
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a928443..5dddc1d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -30,15 +31,18 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 import javax.annotation.PostConstruct;
 
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -345,6 +349,23 @@ public class AlphaIntegrationTest {
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
   }
 
+  @Test
+  public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    String anotherLocalTxId = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId));
+
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 8);
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
@@ -445,6 +466,12 @@ public class AlphaIntegrationTest {
 
   @PostConstruct
   void init() {
-//    new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+    // simulates concurrent db connections
+    new EventScanner(
+        Executors.newSingleThreadScheduledExecutor(),
+        eventRepository,
+        commandRepository,
+        omegaCallback,
+        1).run();
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.