You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/03 13:40:04 UTC

[servicecomb-pack] 02/02: SCB-1321 Add test case receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit df20c2187c2e1665b92ec82e76bb60062c7a442c
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 3 21:39:43 2019 +0800

    SCB-1321 Add test case receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 10 +++---
 .../fsm/event/consumer/SagaEventConsumer.java      |  6 ++--
 .../alpha/server/fsm/GrpcSagaEventService.java     |  4 ++-
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 37 ++++++++++++++++++++--
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  |  1 -
 5 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 51d6c52..5104aca 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -256,7 +256,8 @@ public class SagaActor extends
             }
         ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
             (event, data) -> {
-              if (hasCompensationSentTx(data)) {
+              if (hasCompensationSentTx(data) || !data.isTerminated()) {
+              //if (hasCompensationSentTx(data)) {
                 return stay().replying(data);
               } else {
                 return goTo(SagaActorState.COMPENSATED)
@@ -266,7 +267,7 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              //data.setTerminated(true);
+              data.setTerminated(true);
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
                 return stay().replying(data).applying(domainEvent);
@@ -409,8 +410,8 @@ public class SagaActor extends
       txEntity.setEndTime(System.currentTimeMillis());
       if (domainEvent.getState() == TxState.COMMITTED) {
         // stop
-        data.setEndTime(System.currentTimeMillis());
-        data.setTerminated(true);
+        //data.setEndTime(System.currentTimeMillis());
+        //data.setTerminated(true);
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
         txEntity.setState(domainEvent.getState());
@@ -430,6 +431,7 @@ public class SagaActor extends
     } else if (event instanceof SagaEndedDomain) {
       SagaEndedDomain domainEvent = (SagaEndedDomain) event;
       if (domainEvent.getState() == SagaActorState.FAILED) {
+        data.setTerminated(true);
         data.getTxEntityMap().forEach((k, v) -> {
           if (v.getState() == TxState.COMMITTED) {
             // call compensate
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index 7ddbc1a..fa481f8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -58,10 +58,10 @@ public class SagaEventConsumer {
         saga = optional.get();
       }
       saga.tell(event, ActorRef.noSender());
-      LOG.info("tell {} to {}", event.toString(),saga);
-      //TODO WAL commit
+      if(LOG.isDebugEnabled()){
+        LOG.debug("tell {} to {}", event.toString(),saga);
+      }
     }catch (Exception ex){
-      //TODO
       throw ex;
     }
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 2394ab2..72a1dfa 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -78,7 +78,9 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
   @Override
   @Trace("onTransactionEvent")
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
-    LOG.info("onText {}",message);
+    if(LOG.isDebugEnabled()){
+      LOG.debug("onText {}",message);
+    }
     boolean ok = true;
     BaseEvent event = null;
     if (message.getType().equals(EventType.SagaStartedEvent.name())) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 27f5fd3..3bbaf6d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -31,6 +31,7 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
 import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
+import org.apache.servicecomb.pack.common.EventType;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -50,7 +51,7 @@ import org.springframework.test.context.junit4.SpringRunner;
         "alpha.event.pollingInterval=1",
         "spring.main.allow-bean-definition-overriding=true",
         "alpha.model.actor.enabled=true",
-        "spring.profiles.active=akka-persistence-redis"
+        "spring.profiles.active=akka-persistence-mem"
        })
 public class AlphaIntegrationFsmTest {
   private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();
@@ -187,7 +188,39 @@ public class AlphaIntegrationFsmTest {
     omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
       omegaEventSender.getBlockingStub().onTxEvent(event);
     });
-    await().atMost(2, SECONDS).until(() -> {
+    await().atMost(5, SECONDS).until(() -> {
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+  }
+
+  @Test
+  public void receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      if(event.getType().equals(EventType.TxStartedEvent.name()) && event.getLocalTxId().equals(localTxId_3)){
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 52bcba5..433f54a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -262,7 +262,6 @@ public class OmegaEventSagaSimulator {
         .build();
   }
 
-
   public static Builder builder() {
     return new Builder();
   }