You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:37 UTC

[servicecomb-pack] 04/12: SCB-1321 Remove Actor replying methods

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit a93055e6bc413c4cb66d9ff30be0d5e71634b759
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:47:27 2019 +0800

    SCB-1321 Remove Actor replying methods
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 118 +++++++++--------
 .../pack/alpha/fsm/domain/SagaEndedDomain.java     |   5 +
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 139 ++++++++++-----------
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  24 ++--
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  30 ++---
 5 files changed, 163 insertions(+), 153 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 6881148..a6e3d05 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -98,21 +98,21 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
@@ -120,7 +120,7 @@ public class SagaActor extends
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
-              if (data.getExpirationTime()  != null) {
+              if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
                     .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -132,7 +132,7 @@ public class SagaActor extends
         ).event(TxStartedEvent.class,
             (event, data) -> {
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
-              if (data.getExpirationTime()  != null) {
+              if (data.getExpirationTime() != null) {
                 return stay()
                     .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -145,7 +145,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxAbortedEvent.class,
@@ -156,7 +155,8 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              return goTo(SagaActorState.SUSPENDED)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
@@ -189,7 +189,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaEndedEvent.class,
@@ -197,7 +196,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
@@ -212,7 +210,8 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              return goTo(SagaActorState.SUSPENDED)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));//.replying(data);
             })
     );
 
@@ -222,7 +221,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxCompensatedEvent.class, SagaData.class,
@@ -235,12 +233,12 @@ public class SagaActor extends
         ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
             (event, data) -> {
               if (hasCompensationSentTx(data) || !data.isTerminated()) {
-                return stay().replying(data);
+                return stay();
               } else {
-                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
                     .applying(domainEvent)
-                    .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
@@ -249,15 +247,17 @@ public class SagaActor extends
               data.setTerminated(true);
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
-                return stay().replying(data).applying(domainEvent);
+                return stay()
+                    .applying(domainEvent);
               } else if (hasCompensationSentTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
-                return stay().replying(data).applying(domainEvent);
+                return stay()
+                    .applying(domainEvent);
               } else {
-                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
                     .applying(domainEvent)
-                    .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
@@ -277,21 +277,35 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
     when(SagaActorState.COMMITTED,
         matchAnyEvent(
             (event, data) -> {
-              /**
-               * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
-               * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
-               * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
-               * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
-               * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
-               * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
-               * */
+              //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
+              //  以下基于 journal-redis 说明:
+              //    假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
+              //    journal:persistenceIds
+              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
+              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
+              //
+              //    1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
+              //    2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
+              //       使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
+              //    3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
+              //
+              //    何如清理:
+              //      通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
+              //      遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
+              //      如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
+              //      并删除 journal:persisted:item:highestSequenceNr
+              //
+              //  目前可以看到的解释是 https://github.com/akka/akka/issues/21181
               deleteMessages(lastSequenceNr());
               deleteSnapshot(snapshotSequenceNr());
               return stop();
@@ -333,7 +347,7 @@ public class SagaActor extends
             SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
-          if(LOG.isDebugEnabled()){
+          if (LOG.isDebugEnabled()) {
             LOG.debug("transition {} {} -> {}", getSelf(), from, to);
           }
         })
@@ -342,14 +356,14 @@ public class SagaActor extends
     onTermination(
         matchStop(
             Normal(), (state, data) -> {
-              if(LOG.isDebugEnabled()){
-                LOG.info("stop {} {}", data.getGlobalTxId(), state);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("stop {} {}", data.getGlobalTxId(), state);
               }
               data.setTerminated(true);
               data.setLastState(state);
               data.setEndTime(new Date());
               SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
-                  .putSagaData(data.getGlobalTxId(), data);
+                  .stopSagaData(data.getGlobalTxId(), data);
             }
         )
     );
@@ -359,7 +373,8 @@ public class SagaActor extends
   @Override
   public SagaData applyEvent(DomainEvent event, SagaData data) {
     // log event to SagaData
-    if(event.getEvent() != null && !(event.getEvent() instanceof TxComponsitedCheckInternalEvent)){
+    if (event.getEvent() != null && !(event
+        .getEvent() instanceof TxComponsitedCheckInternalEvent)) {
       data.logEvent(event.getEvent());
     }
     if (event instanceof SagaStartedDomain) {
@@ -391,9 +406,6 @@ public class SagaActor extends
       TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
       txEntity.setEndTime(System.currentTimeMillis());
       if (domainEvent.getState() == TxState.COMMITTED) {
-        // stop
-        //data.setEndTime(System.currentTimeMillis());
-        //data.setTerminated(true);
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
         txEntity.setState(domainEvent.getState());
@@ -431,13 +443,17 @@ public class SagaActor extends
         data.setTerminated(true);
       }
     }
-    LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    }
     return data;
   }
 
   @Override
   public void onRecoveryCompleted() {
-    LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+    }
   }
 
   @Override
@@ -468,37 +484,37 @@ public class SagaActor extends
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     txEntity.setState(TxState.COMPENSATION_SENT);
-    try{
+    try {
       SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
       LOG.info("compensate {}", txEntity.getLocalTxId());
-    }catch (AlphaException ex){
-      LOG.error(ex.getMessage(),ex);
+    } catch (AlphaException ex) {
+      LOG.error(ex.getMessage(), ex);
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        LOG.error(e.getMessage(),e);
+        LOG.error(e.getMessage(), e);
       }
-      compensation(txEntity,data);
-    }catch (Exception ex){
-      LOG.error("compensation failed "+txEntity.getLocalTxId(), ex);
-      if(txEntity.getRetries() > 0){
+      compensation(txEntity, data);
+    } catch (Exception ex) {
+      LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
+      if (txEntity.getRetries() > 0) {
         // which means the retry number
-        if(txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()){
+        if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()) {
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
           }
-          compensation(txEntity,data);
+          compensation(txEntity, data);
         }
-      } else if(txEntity.getRetries() == -1){
+      } else if (txEntity.getRetries() == -1) {
         // which means retry it until succeed
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
-          LOG.error(e.getMessage(),e);
+          LOG.error(e.getMessage(), e);
         }
-        compensation(txEntity,data);
+        compensation(txEntity, data);
       }
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 1f4b216..f5bc708 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -32,6 +32,11 @@ public class SagaEndedDomain implements DomainEvent {
     this.state = state;
   }
 
+
+  public SagaEndedDomain(SagaActorState state) {
+    this.state = state;
+  }
+
   public SagaActorState getState() {
     return state;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 926ae84..1b4d84b 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -132,19 +132,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
       system.stop(saga);
     }};
@@ -220,13 +219,6 @@ public class SagaActorTest {
       assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE,
           SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED,
           SagaActorState.COMMITTED);
@@ -234,6 +226,12 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), recoveredSaga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       eventListFirst.addAll(eventListSecond);
       assertThat(eventListFirst, is(sagaData.getEvents()));
 
@@ -274,18 +272,17 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 1);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 1);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -337,19 +334,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 2);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 2);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -410,20 +407,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -484,15 +480,13 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -548,20 +542,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -700,19 +693,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -824,19 +816,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -888,19 +879,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -951,20 +941,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index f408c09..c1a61a6 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -63,7 +63,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -84,7 +84,7 @@ public class SagaIntegrationTest {
 
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -103,7 +103,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -124,7 +124,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -146,7 +146,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -168,7 +168,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -190,7 +190,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -212,7 +212,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -235,7 +235,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -257,7 +257,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -279,7 +279,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -301,7 +301,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index ecac3b8..d44d728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -99,7 +99,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -121,7 +121,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -143,7 +143,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -167,7 +167,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -193,7 +193,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -225,7 +225,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -250,7 +250,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(20, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -275,7 +275,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -300,7 +300,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(timeout + 1, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -324,7 +324,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -348,7 +348,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -372,7 +372,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -396,7 +396,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -436,7 +436,7 @@ public class AlphaIntegrationFsmTest {
 
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -462,7 +462,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);