You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/21 10:34:19 UTC

[servicecomb-pack] 03/05: SCB-1321 Sub-transaction support concurrent

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 11936fdc9a71224b9b7d0e5a2d55192cf735bc10
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Fri Jun 21 18:25:18 2019 +0800

    SCB-1321 Sub-transaction support concurrent
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  39 +++-
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 204 ++++++++++++++++++++-
 alpha/pom.xml                                      |   3 +-
 docs/fsm/assets/saga_state_diagram.png             | Bin 229442 -> 237463 bytes
 docs/fsm/assets/state_table.png                    | Bin 286375 -> 311887 bytes
 5 files changed, 234 insertions(+), 12 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 492d187..2b48f45 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -95,6 +95,16 @@ public class SagaActor extends
                 return goTo(SagaActorState.PARTIALLY_COMMITTED);
               }
             }
+        ).event(TxStartedEvent.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              if (data.getExpirationTime() > 0) {
+                return stay()
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return stay();
+              }
+            }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
               return goTo(SagaActorState.SUSPENDED)
@@ -123,6 +133,16 @@ public class SagaActor extends
                 return goTo(SagaActorState.PARTIALLY_ACTIVE);
               }
             }
+        ).event(TxEndedEvent.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              if (data.getExpirationTime() > 0) {
+                return stay()
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return stay();
+              }
+            }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
               return goTo(SagaActorState.SUSPENDED)
@@ -196,8 +216,8 @@ public class SagaActor extends
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
               updateTxEntity(event, data);
-              // TODO 调用补偿方法
               TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
+              // TODO call compensate
               compensation(txEntity, data);
               return stay();
             }
@@ -233,8 +253,8 @@ public class SagaActor extends
 
     whenUnhandled(
         matchAnyEvent((event, data) -> {
-          LOG.error("unmatch event {}", event);
-          return stay();
+          LOG.error("Unhandled event {}", event);
+          return goTo(SagaActorState.SUSPENDED).replying(data);
         })
     );
 
@@ -290,26 +310,25 @@ public class SagaActor extends
           if (txEntity.getState() == TxState.ACTIVE) {
             txEntity.setEndTime(System.currentTimeMillis());
             txEntity.setState(TxState.FAILED);
-            // TODO 调用补偿方法
             data.getTxEntityMap().forEach((k, v) -> {
               if (v.getState() == TxState.COMMITTED) {
-                // TODO 调用补偿方法
+                // call compensate
                 compensation(v, data);
               }
             });
           }
         } else if (event instanceof TxComponsitedEvent) {
-          //补偿中计数器减一
+          // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
           txEntity.setState(TxState.COMPENSATED);
-          LOG.info("完成补偿 {}",txEntity.getLocalTxId());
+          LOG.info("compensation is completed {}",txEntity.getLocalTxId());
         }
       }
     } else if (event instanceof SagaEvent) {
       if (event instanceof SagaAbortedEvent) {
         data.getTxEntityMap().forEach((k, v) -> {
           if (v.getState() == TxState.COMMITTED) {
-            // TODO 调用补偿方法
+            // call compensate
             compensation(v, data);
           }
         });
@@ -324,8 +343,8 @@ public class SagaActor extends
   }
 
   private void compensation(TxEntity txEntity, SagaData data) {
-    //补偿中计数器加一
+    // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
-    LOG.info("调用补偿方法 {}", txEntity.getLocalTxId());
+    LOG.info("compensate {}", txEntity.getLocalTxId());
   }
 }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 156d67e..847d144 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -8,6 +8,7 @@ import akka.actor.Terminated;
 import akka.persistence.fsm.PersistentFSM;
 import akka.persistence.fsm.PersistentFSM.CurrentState;
 import akka.testkit.javadsl.TestKit;
+import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
@@ -22,9 +23,13 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SagaActorTest {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   static ActorSystem system;
 
   @BeforeClass
@@ -320,7 +325,7 @@ public class SagaActorTest {
    * 10. SagaAbortedEvent-1
    */
   @Test
-  public void receivedRemainingEventAfterfirstTxAbortedEventTest() {
+  public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
     new TestKit(system) {{
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
@@ -609,6 +614,203 @@ public class SagaActorTest {
     }};
   }
 
+  // tx concurrent execution
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxStartedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-11
+   * 6. TxEndedEvent-12
+   * 7. TxEndedEvent-13
+   * 8. SagaEndedEvent-1
+   */
+  @Test
+  public void successfulTestWithTxConcurrent() throws InterruptedException {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxStartedEvent-12
+   * 5. TxEndedEvent-11
+   * 4. TxStartedEvent-13
+   * 6. TxEndedEvent-12
+   * 7. TxEndedEvent-13
+   * 8. SagaEndedEvent-1
+   */
+  @Test
+  public void successfulTestWithTxConcurrentCross() throws InterruptedException {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 4. TxStartedEvent-12
+   * 6. TxStartedEvent-13
+   * 3. TxEndedEvent-11
+   * 5. TxEndedEvent-12
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  @Test
+  public void lastTxAbortedEventWithTxConcurrentTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
   private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef,
       SagaActorState from, SagaActorState to) {
     assertEquals(transition.fsmRef(), actorRef);
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 6921f5b..3dfaf13 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -32,11 +32,12 @@
   <packaging>pom</packaging>
   <modules>
     <module>alpha-core</module>
+    <module>alpha-fsm</module>
     <module>alpha-spring-boot-compatibility</module>
     <module>alpha-spring-cloud-starter-eureka</module>
     <module>alpha-spring-cloud-starter-consul</module>
     <module>alpha-spring-cloud-starter-zookeeper</module>
-    <module>alpha-server</module>
+    <module>alpha-server</module>    
   </modules>
 
   <build>
diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png
index 96cd6c9..349fb72 100644
Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ
diff --git a/docs/fsm/assets/state_table.png b/docs/fsm/assets/state_table.png
index 5005597..569b2ce 100644
Binary files a/docs/fsm/assets/state_table.png and b/docs/fsm/assets/state_table.png differ