You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/01 14:15:25 UTC

[servicecomb-pack] 02/02: SCB-1321 Support Akka Persistent Redis Recovery

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 9cee529906beaba4b1c41ab0279093acc8b90b67
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jul 1 22:14:52 2019 +0800

    SCB-1321 Support Akka Persistent Redis Recovery
---
 alpha/alpha-fsm/README.md                          |   1 +
 alpha/alpha-fsm/pom.xml                            |  19 +-
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 284 ++++++++++++--------
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   3 +-
 .../AddTxEventDomain.java}                         |  40 ++-
 .../fsm/{TxState.java => domain/DomainEvent.java}  |  11 +-
 .../{TxState.java => domain/SagaEndedDomain.java}  |  20 +-
 .../SagaStartedDomain.java}                        |  28 +-
 .../UpdateTxEventDomain.java}                      |  41 ++-
 ...t.java => TxComponsitedCheckInternalEvent.java} |  31 ++-
 .../pack/alpha/fsm/event/base/BaseEvent.java       |   5 +
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 288 +++++++++++++++++++--
 .../pack/alpha/fsm/SagaEventSender.java            |  60 ++++-
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  52 +++-
 .../alpha-fsm/src/test/resources/application.yaml  |  20 +-
 docs/fsm/assets/saga_state_diagram.png             | Bin 237463 -> 244645 bytes
 docs/fsm/plantuml/saga-state-diagram.puml          |   2 +-
 17 files changed, 705 insertions(+), 200 deletions(-)

diff --git a/alpha/alpha-fsm/README.md b/alpha/alpha-fsm/README.md
index 3a37fed..e7ea7b9 100644
--- a/alpha/alpha-fsm/README.md
+++ b/alpha/alpha-fsm/README.md
@@ -6,6 +6,7 @@
 ## Test State Machine
 
 ```
+git clone -b SCB-1321 git@github.com:apache/servicecomb-pack.git
 cd alpha
 mvn clean package -pl alpha-fsm 
 ```
\ No newline at end of file
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index b1897d9..8788430 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -31,6 +31,7 @@
 
   <properties>
     <leveldbjni-all.version>1.8</leveldbjni-all.version>
+    <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version>
   </properties>
 
   <dependencyManagement>
@@ -72,12 +73,7 @@
     <dependency>
       <groupId>javax.persistence</groupId>
       <artifactId>javax.persistence-api</artifactId>
-    </dependency>    
-<!--    <dependency>-->
-<!--      <groupId>org.apache.logging.log4j</groupId>-->
-<!--      <artifactId>log4j-slf4j-impl</artifactId>-->
-<!--      <scope>test</scope>-->
-<!--    </dependency>-->
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
@@ -103,6 +99,11 @@
       <artifactId>leveldbjni-all</artifactId>
       <version>${leveldbjni-all.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.safety-data</groupId>
+      <artifactId>akka-persistence-redis_2.12</artifactId>
+      <version>${akka-persistence-redis.version}</version>
+    </dependency>
 
     <!-- For testing the artifacts scope are test-->
     <dependency>
@@ -138,7 +139,11 @@
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_2.12</artifactId>
-    </dependency>       
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.12</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index dca006e..d22cb79 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -22,19 +22,20 @@ import akka.persistence.fsm.AbstractPersistentFSM;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
@@ -62,14 +63,15 @@ public class SagaActor extends
     when(SagaActorState.IDEL,
         matchEvent(SagaStartedEvent.class,
             (event, data) -> {
-              data.setGlobalTxId(event.getGlobalTxId());
-              data.setBeginTime(System.currentTimeMillis());
+              SagaStartedDomain domainEvent = new SagaStartedDomain(event.getGlobalTxId(),
+                  event.getCreateTime(), event.getTimeout());
               if (event.getTimeout() > 0) {
-                data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000);
                 return goTo(SagaActorState.READY)
-                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+                    .applying(domainEvent)
+                    .forMax(Duration.create(event.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.READY);
+                return goTo(SagaActorState.READY)
+                    .applying(domainEvent);
               }
             }
 
@@ -79,25 +81,36 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent);
               }
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .replying(data);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .replying(data);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data);
             })
     );
@@ -105,34 +118,43 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_ACTIVE,
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_COMMITTED);
+                return goTo(SagaActorState.PARTIALLY_COMMITTED)
+                    .applying(domainEvent);
               }
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return stay()
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return stay();
+                return stay().applying(domainEvent);
               }
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.FAILED);
+              return goTo(SagaActorState.FAILED)
+                  .applying(domainEvent);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -143,47 +165,55 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent);
               }
             }
         ).event(TxEndedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
               if (data.getExpirationTime() > 0) {
                 return stay()
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return stay();
+                return stay().applying(domainEvent);
               }
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+              return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.FAILED);
+              return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -194,18 +224,25 @@ public class SagaActor extends
     when(SagaActorState.FAILED,
         matchEvent(SagaTimeoutEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxComponsitedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMPENSATED);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                self().tell(TxComponsitedCheckInternalEvent.builder().build(), self());
+              }));
+            }
+        ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
+            (event, data) -> {
               if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
                   || hasCommittedTx(data)) {
-                return stay();
+                return stay().replying(data);
               } else {
                 return goTo(SagaActorState.COMPENSATED)
                     .replying(data)
@@ -214,30 +251,36 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
               data.setTerminated(true);
-              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
-                  || hasCommittedTx(data)) {
-                return stay();
+              if (hasCommittedTx(data)) {
+                SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+                return stay().replying(data).applying(domainEvent);
+              } else if(hasCompensationSentTx(data)){
+                return stay().replying(data);
               } else {
+                SagaEndedDomain domainEvent = new SagaEndedDomain(
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
+                    .applying(domainEvent)
                     .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return stay();
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
+              return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
-              // TODO call compensate
-              compensation(txEntity, data);
-              return stay();
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId());
+                // call compensate
+                compensation(txEntity, _data);
+              }));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -248,6 +291,17 @@ public class SagaActor extends
     when(SagaActorState.COMMITTED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              /**
+               * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
+               * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+               * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
+               * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
+               * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
+               * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+               * */
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -256,6 +310,9 @@ public class SagaActor extends
     when(SagaActorState.SUSPENDED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -264,6 +321,9 @@ public class SagaActor extends
     when(SagaActorState.COMPENSATED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -278,6 +338,11 @@ public class SagaActor extends
 
     onTransition(
         matchState(null, null, (from, to) -> {
+          if (stateData().getGlobalTxId() != null) {
+            stateData().setLastState(to);
+            LogExtension.LogExtensionProvider.get(getContext().getSystem())
+                .putSagaData(stateData().getGlobalTxId(), stateData());
+          }
           LOG.info("transition {} {} -> {}", getSelf(), from, to);
         })
     );
@@ -288,7 +353,8 @@ public class SagaActor extends
               LOG.info("stop {} {}", data.getGlobalTxId(), state);
               data.setTerminated(true);
               data.setLastState(state);
-              LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data);
+              LogExtension.LogExtensionProvider.get(getContext().getSystem())
+                  .putSagaData(data.getGlobalTxId(), data);
             }
         )
     );
@@ -296,73 +362,77 @@ public class SagaActor extends
   }
 
   @Override
-  public void onRecoveryCompleted() {
-    LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData());
-  }
-
-  @Override
-  public Class domainEventClass() {
-    return SagaDomainEvent.DomainEvent.class;
-  }
-
-
-  @Override
-  public String persistenceId() {
-    return persistenceId;
-  }
-
-  @Override
-  public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) {
-    return currentData;
-  }
-
-  private void updateTxEntity(BaseEvent event, SagaData data) {
-    if (event instanceof TxEvent) {
-      TxEvent txEvent = (TxEvent) event;
-      if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) {
-        if (event instanceof TxStartedEvent) {
-          TxEntity txEntity = TxEntity.builder()
-              .localTxId(txEvent.getLocalTxId())
-              .parentTxId(txEvent.getParentTxId())
-              .state(TxState.ACTIVE)
-              .build();
-          data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
-        }
+  public SagaData applyEvent(DomainEvent event, SagaData data) {
+    if (event instanceof SagaStartedDomain) {
+      SagaStartedDomain domainEvent = (SagaStartedDomain) event;
+      data.setGlobalTxId(domainEvent.getGlobalTxId());
+      data.setBeginTime(domainEvent.getCreateTime());
+      data.setExpirationTime(domainEvent.getExpirationTime());
+    } else if (event instanceof AddTxEventDomain) {
+      AddTxEventDomain domainEvent = (AddTxEventDomain) event;
+      if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) {
+        TxEntity txEntity = TxEntity.builder()
+            .localTxId(domainEvent.getLocalTxId())
+            .parentTxId(domainEvent.getParentTxId())
+            .state(domainEvent.getState())
+            .build();
+        data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
       } else {
-        TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId());
-        if (event instanceof TxEndedEvent) {
-          if (txEntity.getState() == TxState.ACTIVE) {
-            txEntity.setEndTime(System.currentTimeMillis());
-            txEntity.setState(TxState.COMMITTED);
-          }
-        } else if (event instanceof TxAbortedEvent) {
-          if (txEntity.getState() == TxState.ACTIVE) {
-            txEntity.setEndTime(System.currentTimeMillis());
-            txEntity.setState(TxState.FAILED);
-            data.getTxEntityMap().forEach((k, v) -> {
-              if (v.getState() == TxState.COMMITTED) {
-                // call compensate
-                compensation(v, data);
-              }
-            });
+        LOG.warn("TxEntity {} already exists", domainEvent.getLocalTxId());
+      }
+    } else if (event instanceof UpdateTxEventDomain) {
+      UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
+      TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
+      txEntity.setEndTime(System.currentTimeMillis());
+      if (domainEvent.getState() == TxState.COMMITTED) {
+        txEntity.setState(domainEvent.getState());
+      } else if (domainEvent.getState() == TxState.FAILED) {
+        txEntity.setState(domainEvent.getState());
+        data.getTxEntityMap().forEach((k, v) -> {
+          if (v.getState() == TxState.COMMITTED) {
+            // call compensate
+            compensation(v, data);
           }
-        } else if (event instanceof TxComponsitedEvent) {
-          // decrement the compensation running counter by one
-          data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(TxState.COMPENSATED);
-          LOG.info("compensation is completed {}", txEntity.getLocalTxId());
-        }
+        });
+      } else if (domainEvent.getState() == TxState.COMPENSATED) {
+        // decrement the compensation running counter by one
+        data.getCompensationRunningCounter().decrementAndGet();
+        txEntity.setState(domainEvent.getState());
+        LOG.info("compensation is completed {}", txEntity.getLocalTxId());
       }
-    } else if (event instanceof SagaEvent) {
-      if (event instanceof SagaAbortedEvent) {
+    } else if (event instanceof SagaEndedDomain) {
+      SagaEndedDomain domainEvent = (SagaEndedDomain) event;
+      if (domainEvent.getState() == SagaActorState.FAILED) {
         data.getTxEntityMap().forEach((k, v) -> {
           if (v.getState() == TxState.COMMITTED) {
             // call compensate
             compensation(v, data);
           }
         });
+      } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
+
+      } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
+
       }
     }
+    LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    return data;
+  }
+
+  @Override
+  public void onRecoveryCompleted() {
+    LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+  }
+
+  @Override
+  public Class domainEventClass() {
+    return DomainEvent.class;
+  }
+
+
+  @Override
+  public String persistenceId() {
+    return persistenceId;
   }
 
   private boolean hasCommittedTx(SagaData data) {
@@ -371,9 +441,17 @@ public class SagaActor extends
         .count() > 0;
   }
 
+  private boolean hasCompensationSentTx(SagaData data) {
+    return data.getTxEntityMap().entrySet().stream()
+        .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
+        .count() > 0;
+  }
+
   private void compensation(TxEntity txEntity, SagaData data) {
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
+    //TODO call omega compensate method
     LOG.info("compensate {}", txEntity.getLocalTxId());
+    txEntity.setState(TxState.COMPENSATION_SENT);
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
index 0494895..af02ce8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -21,5 +21,6 @@ public enum TxState {
   ACTIVE,
   FAILED,
   COMMITTED,
-  COMPENSATED;
+  COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent
+  COMPENSATED
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
similarity index 51%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index 5794566..c7c65a3 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -15,29 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
-public abstract class BaseEvent implements Serializable {
-  private String globalTxId;
+public class AddTxEventDomain implements DomainEvent {
+  private String parentTxId;
+  private String localTxId;
+  private TxState state = TxState.ACTIVE;
 
-  public BaseEvent() {
+  public AddTxEventDomain(String parentTxId, String localTxId) {
+    this.parentTxId = parentTxId;
+    this.localTxId = localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
 
+  public String getLocalTxId() {
+    return localTxId;
   }
 
-  public String getGlobalTxId() {
-    return globalTxId;
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
+  public TxState getState() {
+    return state;
   }
 
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public void setState(TxState state) {
+    this.state = state;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
similarity index 85%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
index 0494895..10c293c 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
+
+import java.io.Serializable;
+
+public interface DomainEvent extends Serializable {
 
-public enum TxState {
-  ACTIVE,
-  FAILED,
-  COMMITTED,
-  COMPENSATED;
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
similarity index 70%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 0494895..d3e40b1 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -15,11 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-public enum TxState {
-  ACTIVE,
-  FAILED,
-  COMMITTED,
-  COMPENSATED;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+
+public class SagaEndedDomain implements DomainEvent {
+
+  private SagaActorState state;
+
+  public SagaEndedDomain(SagaActorState state) {
+    this.state = state;
+  }
+
+  public SagaActorState getState() {
+    return state;
+  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
similarity index 65%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
index 5794566..fe75f04 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
@@ -15,29 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+public class SagaStartedDomain implements DomainEvent {
 
-public abstract class BaseEvent implements Serializable {
+  private long createTime;
   private String globalTxId;
+  private long expirationTime;
 
-  public BaseEvent() {
+  public SagaStartedDomain(String globalTxId, long createTime, int timeout) {
+    this.createTime = createTime;
+    this.globalTxId = globalTxId;
+    if (timeout > 0) {
+      this.expirationTime = System.currentTimeMillis() + timeout * 1000;
+    }
+  }
 
+  public long getCreateTime() {
+    return createTime;
   }
 
   public String getGlobalTxId() {
     return globalTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
-  }
-
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public long getExpirationTime() {
+    return expirationTime;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
similarity index 50%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 5794566..839cfe0 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -15,29 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
-public abstract class BaseEvent implements Serializable {
-  private String globalTxId;
+public class UpdateTxEventDomain implements DomainEvent {
+  private String parentTxId;
+  private String localTxId;
+  private TxState state;
 
-  public BaseEvent() {
+  public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state) {
+    this.parentTxId = parentTxId;
+    this.localTxId = localTxId;
+    this.state = state;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
 
+  public String getLocalTxId() {
+    return localTxId;
   }
 
-  public String getGlobalTxId() {
-    return globalTxId;
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
+  public TxState getState() {
+    return state;
   }
 
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public void setState(TxState state) {
+    this.state = state;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
similarity index 60%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
index b16e44a..225ef26 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
@@ -17,15 +17,24 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.event;
 
-public class SagaDomainEvent {
-  public interface DomainEvent {}
-
-  public enum SagaStartedEvent implements DomainEvent {INSTANCE}
-  public enum SagaEndedEvent implements DomainEvent {INSTANCE}
-  public enum SagaAbortedEvent implements DomainEvent {INSTANCE}
-  public enum SagaTimeoutEvent implements DomainEvent {INSTANCE}
-  public enum TxStartedEvent implements DomainEvent {INSTANCE}
-  public enum TxEndedEvent implements DomainEvent {INSTANCE}
-  public enum TxAbortedEvent implements DomainEvent {INSTANCE}
-  public enum TxComponsitedEvent implements DomainEvent {INSTANCE}
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxComponsitedCheckInternalEvent extends TxEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxComponsitedCheckInternalEvent txComponsitedEvent;
+
+    private Builder() {
+      txComponsitedEvent = new TxComponsitedCheckInternalEvent();
+    }
+
+    public TxComponsitedCheckInternalEvent build() {
+      return txComponsitedEvent;
+    }
+  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
index 5794566..4ba8372 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -21,11 +21,16 @@ import java.io.Serializable;
 
 public abstract class BaseEvent implements Serializable {
   private String globalTxId;
+  private long createTime = System.currentTimeMillis();
 
   public BaseEvent() {
 
   }
 
+  public long getCreateTime() {
+    return createTime;
+  }
+
   public String getGlobalTxId() {
     return globalTxId;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 5644c1b..313c526 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -25,9 +25,13 @@ import akka.actor.Terminated;
 import akka.persistence.fsm.PersistentFSM;
 import akka.persistence.fsm.PersistentFSM.CurrentState;
 import akka.testkit.javadsl.TestKit;
+import com.typesafe.config.ConfigFactory;
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,9 +40,30 @@ public class SagaActorTest {
 
   static ActorSystem system;
 
+  private static Map<String,Object> getPersistenceMemConfig(){
+    Map<String, Object> map = new HashMap<>();
+    map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem");
+    map.put("akka.persistence.journal.leveldb.dir", "target/example/journal");
+    map.put("akka.persistence.snapshot-store.plugin", "akka.persistence.snapshot-store.local");
+    map.put("akka.persistence.snapshot-store.local.dir", "target/example/snapshots");
+    return map;
+  }
+
+  private static Map<String,Object> getPersistenceRedisConfig(){
+    Map<String, Object> map = new HashMap<>();
+    map.put("akka.actor.warn-about-java-serializer-usage",false);
+    map.put("akka.persistence.journal.plugin", "akka-persistence-redis.journal");
+    map.put("akka.persistence.snapshot-store.plugin", "akka-persistence-redis.snapshot");
+    map.put("akka-persistence-redis.redis.mode", "simple");
+    map.put("akka-persistence-redis.redis.host", "localhost");
+    map.put("akka-persistence-redis.redis.port", "6379");
+    map.put("akka-persistence-redis.redis.database", "0");
+    return map;
+  }
+
   @BeforeClass
   public static void setup() {
-    system = ActorSystem.create("SagaActorTest");
+    system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
   }
 
   @AfterClass
@@ -69,7 +94,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -122,6 +147,158 @@ public class SagaActorTest {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 6. SagaEndedEvent-1
+   */
+  @Test
+  public void successfulRecoveryWithCorrectStateDataTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      String persistenceId = genPersistenceId();
+      ActorRef saga = system.actorOf(SagaActor.props(persistenceId));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      //expectTerminated(saga);
+
+      ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+      watch(recoveredSaga);
+      recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        recoveredSaga.tell(event, getRef());
+      });
+
+      currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), recoveredSaga);
+      system.stop(saga);
+    }};
+  }
+
+  @Test
+  public void successfulRecoveryWithCorrectStateDataTestAndDiffentSystem() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    String persistenceId = genPersistenceId();
+    ActorSystem system1 = ActorSystem.create("SagaActorTest1", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+    new TestKit(system1) {{
+      ActorRef saga = system1.actorOf(SagaActor.props(persistenceId));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+      system1.stop(saga);
+    }};
+    TestKit.shutdownActorSystem(system1);
+
+    ActorSystem system2 = ActorSystem.create("SagaActorTest2", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+    new TestKit(system2) {{
+      ActorRef recoveredSaga = system2.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+      watch(recoveredSaga);
+      recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        recoveredSaga.tell(event, getRef());
+      });
+
+      CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), recoveredSaga);
+      system2.stop(recoveredSaga);
+    }};
+    TestKit.shutdownActorSystem(system2);
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 7. SagaAbortedEvent-1
    */
@@ -131,7 +308,7 @@ public class SagaActorTest {
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -185,7 +362,7 @@ public class SagaActorTest {
       final String localTxId_1 = UUID.randomUUID().toString();
       final String localTxId_2 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -249,7 +426,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -303,6 +480,79 @@ public class SagaActorTest {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  @Test
+  public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxEndedEvent-12
@@ -320,7 +570,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -381,7 +631,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -417,7 +667,13 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -425,12 +681,6 @@ public class SagaActorTest {
       assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
-      transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
-
-      Terminated terminated = expectMsgClass(Terminated.class);
-      assertEquals(terminated.getActor(), saga);
-
       system.stop(saga);
     }};
   }
@@ -454,7 +704,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -522,7 +772,7 @@ public class SagaActorTest {
       final String localTxId_3 = UUID.randomUUID().toString();
       final int timeout = 5;
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -584,7 +834,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -639,7 +889,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -702,7 +952,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index a3ddd0d..d303f86 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -124,6 +124,33 @@ public class SagaEventSender {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. SagaAbortedEvent-1
+   * 9. TxComponsitedEvent-11
+   * 10. TxComponsitedEvent-12
+   */
+  public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxEndedEvent-12
@@ -292,6 +319,37 @@ public class SagaEventSender {
     sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
-  }  
+  }
+
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   */
+  public static List<BaseEvent> successfulFirstHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. TxEndedEvent-12
+   * 2. TxStartedEvent-13
+   * 3. TxEndedEvent-13
+   * 4. SagaEndedEvent-1
+   */
+  public static List<BaseEvent> successfulSecondHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
 
 }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 5a213f1..85c8b18 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -36,9 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
         "alpha.model.actor.enabled=true",
-        "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
-        "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
-        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
+        "spring.profiles.active=akka-persistence-redis"
     })
 public class SagaIntegrationTest {
 
@@ -63,6 +61,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -85,6 +85,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 1
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED;
       }else{
@@ -106,6 +108,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 2
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED;
@@ -129,6 +133,34 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+      SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        sagaEventBus.post(event);
+      });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -153,6 +185,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -177,6 +211,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -201,6 +237,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -226,6 +264,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -250,6 +290,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -274,6 +316,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -298,6 +342,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml
index b3577a6..9ca5b2e 100644
--- a/alpha/alpha-fsm/src/test/resources/application.yaml
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -15,7 +15,25 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
+---
+spring:
+  profiles: akka-persistence-mem
 akkaConfig:
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
+  akka.persistence.journal.leveldb.dir: target/example/journal
   akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
-  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
+
+---
+spring:
+  profiles: akka-persistence-redis
+akkaConfig:
+  akka.persistence.journal.plugin: akka-persistence-redis.journal
+  akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot
+  akka-persistence-redis:
+    redis:
+      mode: simple
+      host: localhost
+      port: 6379
+      database: 0
+      #password:
\ No newline at end of file
diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png
index 349fb72..4bc1ba4 100644
Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ
diff --git a/docs/fsm/plantuml/saga-state-diagram.puml b/docs/fsm/plantuml/saga-state-diagram.puml
index 429687d..b9233c5 100644
--- a/docs/fsm/plantuml/saga-state-diagram.puml
+++ b/docs/fsm/plantuml/saga-state-diagram.puml
@@ -33,7 +33,7 @@ FAILED --> COMPENSATED : SagaAbortedEvent<font color=red>:doCompensation</font>
 
 FAILED --> SUSPENDED : SagaTimeoutEvent
 
-FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>
+FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>\nTxComponsitedCheckInternalEvent
 
 COMPENSATED --> [*]