You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/01 14:15:23 UTC

[servicecomb-pack] branch SCB-1321 updated (281e34c -> 9cee529)

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a change to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from 281e34c  SCB-1321 Add Saga Event Message Bus
     new 6ba7899  SCB-1321 Support for defining Akka properties in application.yaml
     new 9cee529  SCB-1321 Support Akka Persistent Redis Recovery

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-fsm/README.md                          |   1 +
 alpha/alpha-fsm/pom.xml                            |  19 +-
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |  13 +-
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 284 ++++++++++++--------
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   3 +-
 .../TxEvent.java => domain/AddTxEventDomain.java}  |  25 +-
 .../pack/alpha/fsm/domain/DomainEvent.java         |   5 +-
 .../pack/alpha/fsm/domain/SagaEndedDomain.java     |  19 +-
 .../SagaStartedDomain.java}                        |  28 +-
 .../UpdateTxEventDomain.java}                      |  26 +-
 .../pack/alpha/fsm/event/SagaDomainEvent.java      |  31 ---
 ...t.java => TxComponsitedCheckInternalEvent.java} |  17 +-
 .../pack/alpha/fsm/event/base/BaseEvent.java       |   5 +
 .../akka/AkkaConfigPropertyAdapter.java            |  68 +++++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 288 +++++++++++++++++++--
 .../pack/alpha/fsm/SagaEventSender.java            |  60 ++++-
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  51 +++-
 .../alpha-fsm/src/test/resources/application.conf  |  20 --
 .../alpha-fsm/src/test}/resources/application.yaml |  40 ++-
 docs/fsm/assets/saga_state_diagram.png             | Bin 237463 -> 244645 bytes
 docs/fsm/plantuml/saga-state-diagram.puml          |   2 +-
 21 files changed, 740 insertions(+), 265 deletions(-)
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{event/base/TxEvent.java => domain/AddTxEventDomain.java} (71%)
 copy omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/IdGenerator.java => alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java (87%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/OmegaException.java => alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java (71%)
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{event/base/BaseEvent.java => domain/SagaStartedDomain.java} (65%)
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{event/base/TxEvent.java => domain/UpdateTxEventDomain.java} (70%)
 delete mode 100644 alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/{SagaEndedEvent.java => TxComponsitedCheckInternalEvent.java} (72%)
 create mode 100644 alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
 delete mode 100644 alpha/alpha-fsm/src/test/resources/application.conf
 copy {demo/saga-dubbo-demo/serviceb/src/main => alpha/alpha-fsm/src/test}/resources/application.yaml (60%)


[servicecomb-pack] 01/02: SCB-1321 Support for defining Akka properties in application.yaml

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 6ba78992ebd9843220f0b889516ce80982610795
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Sat Jun 29 23:36:27 2019 +0800

    SCB-1321 Support for defining Akka properties in application.yaml
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       | 13 +++--
 .../akka/AkkaConfigPropertyAdapter.java            | 68 ++++++++++++++++++++++
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  5 +-
 .../{application.conf => application.yaml}         |  7 ++-
 4 files changed, 85 insertions(+), 8 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 252d6ae..c5082ab 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -21,25 +21,30 @@ import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.util.Map;
 import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.ConfigurableEnvironment;
 
 @Configuration
 @ConditionalOnProperty(value = {"alpha.model.actor.enabled"})
 public class FsmAutoConfiguration {
 
   @Bean
-  public ActorSystem actorSystem() {
-    ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration());
+  public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
+    ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
     return system;
   }
 
   @Bean
-  public Config akkaConfiguration() {
-    return ConfigFactory.load();
+  public Config akkaConfiguration(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
+    final Map<String, Object> converted = AkkaConfigPropertyAdapter.getPropertyMap(environment);
+    return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
   }
 
   @Bean(name = "sagaEventBus")
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
new file mode 100644
index 0000000..c6ae195
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import java.lang.invoke.MethodHandles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.EnumerablePropertySource;
+import org.springframework.core.env.PropertySource;
+
+import java.util.*;
+import org.springframework.core.env.StandardEnvironment;
+
+public class AkkaConfigPropertyAdapter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String PROPERTY_SOURCE_NAME = "akkaConfig.";
+
+  public static Map<String, Object> getPropertyMap(ConfigurableEnvironment environment) {
+    final Map<String, Object> propertyMap = new HashMap<>();
+
+    for (final PropertySource source : environment.getPropertySources()) {
+      if (isEligiblePropertySource(source)) {
+        final EnumerablePropertySource enumerable = (EnumerablePropertySource) source;
+        LOG.debug("Adding properties from property source " + source.getName());
+        for (final String name : enumerable.getPropertyNames()) {
+          if (name.startsWith(PROPERTY_SOURCE_NAME) && !propertyMap.containsKey(name)) {
+            String key = name.substring(PROPERTY_SOURCE_NAME.length());
+            Object value = environment.getProperty(name);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Adding property {}={}" + key, value);
+            }
+            propertyMap.put(key, value);
+          }
+        }
+      }
+    }
+
+    return Collections.unmodifiableMap(propertyMap);
+  }
+
+  public static boolean isEligiblePropertySource(PropertySource source) {
+    // Exclude system environment properties and system property sources
+    // because they are already included in the default configuration
+    final String name = source.getName();
+    return (source instanceof EnumerablePropertySource) &&
+        !(
+            name.equalsIgnoreCase(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME) ||
+                name.equalsIgnoreCase(StandardEnvironment.SYSTEM_PROPERTIES_PROPERTY_SOURCE_NAME)
+        );
+  }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index c13c6af..5a213f1 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -35,7 +35,10 @@ import org.springframework.test.context.junit4.SpringRunner;
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
-        "alpha.model.actor.enabled=true"
+        "alpha.model.actor.enabled=true",
+        "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
+        "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
+        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
     })
 public class SagaIntegrationTest {
 
diff --git a/alpha/alpha-fsm/src/test/resources/application.conf b/alpha/alpha-fsm/src/test/resources/application.yaml
similarity index 80%
rename from alpha/alpha-fsm/src/test/resources/application.conf
rename to alpha/alpha-fsm/src/test/resources/application.yaml
index 8d39346..b3577a6 100644
--- a/alpha/alpha-fsm/src/test/resources/application.conf
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -15,6 +15,7 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
-akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
-akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
-akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
\ No newline at end of file
+akkaConfig:
+  akka.persistence.journal.plugin: akka.persistence.journal.inmem
+  akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file


[servicecomb-pack] 02/02: SCB-1321 Support Akka Persistent Redis Recovery

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 9cee529906beaba4b1c41ab0279093acc8b90b67
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jul 1 22:14:52 2019 +0800

    SCB-1321 Support Akka Persistent Redis Recovery
---
 alpha/alpha-fsm/README.md                          |   1 +
 alpha/alpha-fsm/pom.xml                            |  19 +-
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 284 ++++++++++++--------
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   3 +-
 .../AddTxEventDomain.java}                         |  40 ++-
 .../fsm/{TxState.java => domain/DomainEvent.java}  |  11 +-
 .../{TxState.java => domain/SagaEndedDomain.java}  |  20 +-
 .../SagaStartedDomain.java}                        |  28 +-
 .../UpdateTxEventDomain.java}                      |  41 ++-
 ...t.java => TxComponsitedCheckInternalEvent.java} |  31 ++-
 .../pack/alpha/fsm/event/base/BaseEvent.java       |   5 +
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 288 +++++++++++++++++++--
 .../pack/alpha/fsm/SagaEventSender.java            |  60 ++++-
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  52 +++-
 .../alpha-fsm/src/test/resources/application.yaml  |  20 +-
 docs/fsm/assets/saga_state_diagram.png             | Bin 237463 -> 244645 bytes
 docs/fsm/plantuml/saga-state-diagram.puml          |   2 +-
 17 files changed, 705 insertions(+), 200 deletions(-)

diff --git a/alpha/alpha-fsm/README.md b/alpha/alpha-fsm/README.md
index 3a37fed..e7ea7b9 100644
--- a/alpha/alpha-fsm/README.md
+++ b/alpha/alpha-fsm/README.md
@@ -6,6 +6,7 @@
 ## Test State Machine
 
 ```
+git clone -b SCB-1321 git@github.com:apache/servicecomb-pack.git
 cd alpha
 mvn clean package -pl alpha-fsm 
 ```
\ No newline at end of file
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index b1897d9..8788430 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -31,6 +31,7 @@
 
   <properties>
     <leveldbjni-all.version>1.8</leveldbjni-all.version>
+    <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version>
   </properties>
 
   <dependencyManagement>
@@ -72,12 +73,7 @@
     <dependency>
       <groupId>javax.persistence</groupId>
       <artifactId>javax.persistence-api</artifactId>
-    </dependency>    
-<!--    <dependency>-->
-<!--      <groupId>org.apache.logging.log4j</groupId>-->
-<!--      <artifactId>log4j-slf4j-impl</artifactId>-->
-<!--      <scope>test</scope>-->
-<!--    </dependency>-->
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
@@ -103,6 +99,11 @@
       <artifactId>leveldbjni-all</artifactId>
       <version>${leveldbjni-all.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.safety-data</groupId>
+      <artifactId>akka-persistence-redis_2.12</artifactId>
+      <version>${akka-persistence-redis.version}</version>
+    </dependency>
 
     <!-- For testing the artifacts scope are test-->
     <dependency>
@@ -138,7 +139,11 @@
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_2.12</artifactId>
-    </dependency>       
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.12</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index dca006e..d22cb79 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -22,19 +22,20 @@ import akka.persistence.fsm.AbstractPersistentFSM;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
@@ -62,14 +63,15 @@ public class SagaActor extends
     when(SagaActorState.IDEL,
         matchEvent(SagaStartedEvent.class,
             (event, data) -> {
-              data.setGlobalTxId(event.getGlobalTxId());
-              data.setBeginTime(System.currentTimeMillis());
+              SagaStartedDomain domainEvent = new SagaStartedDomain(event.getGlobalTxId(),
+                  event.getCreateTime(), event.getTimeout());
               if (event.getTimeout() > 0) {
-                data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000);
                 return goTo(SagaActorState.READY)
-                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+                    .applying(domainEvent)
+                    .forMax(Duration.create(event.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.READY);
+                return goTo(SagaActorState.READY)
+                    .applying(domainEvent);
               }
             }
 
@@ -79,25 +81,36 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent);
               }
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .replying(data);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .replying(data);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data);
             })
     );
@@ -105,34 +118,43 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_ACTIVE,
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_COMMITTED);
+                return goTo(SagaActorState.PARTIALLY_COMMITTED)
+                    .applying(domainEvent);
               }
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return stay()
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return stay();
+                return stay().applying(domainEvent);
               }
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.FAILED);
+              return goTo(SagaActorState.FAILED)
+                  .applying(domainEvent);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -143,47 +165,55 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .applying(domainEvent);
               }
             }
         ).event(TxEndedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
               if (data.getExpirationTime() > 0) {
                 return stay()
+                    .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
               } else {
-                return stay();
+                return stay().applying(domainEvent);
               }
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+              return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return goTo(SagaActorState.FAILED);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.FAILED);
+              return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -194,18 +224,25 @@ public class SagaActor extends
     when(SagaActorState.FAILED,
         matchEvent(SagaTimeoutEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxComponsitedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMPENSATED);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                self().tell(TxComponsitedCheckInternalEvent.builder().build(), self());
+              }));
+            }
+        ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
+            (event, data) -> {
               if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
                   || hasCommittedTx(data)) {
-                return stay();
+                return stay().replying(data);
               } else {
                 return goTo(SagaActorState.COMPENSATED)
                     .replying(data)
@@ -214,30 +251,36 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
-              data.setEndTime(System.currentTimeMillis());
-              updateTxEntity(event, data);
               data.setTerminated(true);
-              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
-                  || hasCommittedTx(data)) {
-                return stay();
+              if (hasCommittedTx(data)) {
+                SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+                return stay().replying(data).applying(domainEvent);
+              } else if(hasCompensationSentTx(data)){
+                return stay().replying(data);
               } else {
+                SagaEndedDomain domainEvent = new SagaEndedDomain(
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
+                    .applying(domainEvent)
                     .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              return stay();
+              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId());
+              return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
-              updateTxEntity(event, data);
-              TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
-              // TODO call compensate
-              compensation(txEntity, data);
-              return stay();
+              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+                  event.getLocalTxId(), TxState.COMMITTED);
+              return stay().applying(domainEvent).andThen(exec(_data -> {
+                TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId());
+                // call compensate
+                compensation(txEntity, _data);
+              }));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
@@ -248,6 +291,17 @@ public class SagaActor extends
     when(SagaActorState.COMMITTED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              /**
+               * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
+               * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+               * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
+               * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
+               * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
+               * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+               * */
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -256,6 +310,9 @@ public class SagaActor extends
     when(SagaActorState.SUSPENDED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -264,6 +321,9 @@ public class SagaActor extends
     when(SagaActorState.COMPENSATED,
         matchAnyEvent(
             (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              deleteMessages(lastSequenceNr());
+              deleteSnapshot(snapshotSequenceNr());
               return stop();
             }
         )
@@ -278,6 +338,11 @@ public class SagaActor extends
 
     onTransition(
         matchState(null, null, (from, to) -> {
+          if (stateData().getGlobalTxId() != null) {
+            stateData().setLastState(to);
+            LogExtension.LogExtensionProvider.get(getContext().getSystem())
+                .putSagaData(stateData().getGlobalTxId(), stateData());
+          }
           LOG.info("transition {} {} -> {}", getSelf(), from, to);
         })
     );
@@ -288,7 +353,8 @@ public class SagaActor extends
               LOG.info("stop {} {}", data.getGlobalTxId(), state);
               data.setTerminated(true);
               data.setLastState(state);
-              LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data);
+              LogExtension.LogExtensionProvider.get(getContext().getSystem())
+                  .putSagaData(data.getGlobalTxId(), data);
             }
         )
     );
@@ -296,73 +362,77 @@ public class SagaActor extends
   }
 
   @Override
-  public void onRecoveryCompleted() {
-    LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData());
-  }
-
-  @Override
-  public Class domainEventClass() {
-    return SagaDomainEvent.DomainEvent.class;
-  }
-
-
-  @Override
-  public String persistenceId() {
-    return persistenceId;
-  }
-
-  @Override
-  public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) {
-    return currentData;
-  }
-
-  private void updateTxEntity(BaseEvent event, SagaData data) {
-    if (event instanceof TxEvent) {
-      TxEvent txEvent = (TxEvent) event;
-      if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) {
-        if (event instanceof TxStartedEvent) {
-          TxEntity txEntity = TxEntity.builder()
-              .localTxId(txEvent.getLocalTxId())
-              .parentTxId(txEvent.getParentTxId())
-              .state(TxState.ACTIVE)
-              .build();
-          data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
-        }
+  public SagaData applyEvent(DomainEvent event, SagaData data) {
+    if (event instanceof SagaStartedDomain) {
+      SagaStartedDomain domainEvent = (SagaStartedDomain) event;
+      data.setGlobalTxId(domainEvent.getGlobalTxId());
+      data.setBeginTime(domainEvent.getCreateTime());
+      data.setExpirationTime(domainEvent.getExpirationTime());
+    } else if (event instanceof AddTxEventDomain) {
+      AddTxEventDomain domainEvent = (AddTxEventDomain) event;
+      if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) {
+        TxEntity txEntity = TxEntity.builder()
+            .localTxId(domainEvent.getLocalTxId())
+            .parentTxId(domainEvent.getParentTxId())
+            .state(domainEvent.getState())
+            .build();
+        data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
       } else {
-        TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId());
-        if (event instanceof TxEndedEvent) {
-          if (txEntity.getState() == TxState.ACTIVE) {
-            txEntity.setEndTime(System.currentTimeMillis());
-            txEntity.setState(TxState.COMMITTED);
-          }
-        } else if (event instanceof TxAbortedEvent) {
-          if (txEntity.getState() == TxState.ACTIVE) {
-            txEntity.setEndTime(System.currentTimeMillis());
-            txEntity.setState(TxState.FAILED);
-            data.getTxEntityMap().forEach((k, v) -> {
-              if (v.getState() == TxState.COMMITTED) {
-                // call compensate
-                compensation(v, data);
-              }
-            });
+        LOG.warn("TxEntity {} already exists", domainEvent.getLocalTxId());
+      }
+    } else if (event instanceof UpdateTxEventDomain) {
+      UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
+      TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
+      txEntity.setEndTime(System.currentTimeMillis());
+      if (domainEvent.getState() == TxState.COMMITTED) {
+        txEntity.setState(domainEvent.getState());
+      } else if (domainEvent.getState() == TxState.FAILED) {
+        txEntity.setState(domainEvent.getState());
+        data.getTxEntityMap().forEach((k, v) -> {
+          if (v.getState() == TxState.COMMITTED) {
+            // call compensate
+            compensation(v, data);
           }
-        } else if (event instanceof TxComponsitedEvent) {
-          // decrement the compensation running counter by one
-          data.getCompensationRunningCounter().decrementAndGet();
-          txEntity.setState(TxState.COMPENSATED);
-          LOG.info("compensation is completed {}", txEntity.getLocalTxId());
-        }
+        });
+      } else if (domainEvent.getState() == TxState.COMPENSATED) {
+        // decrement the compensation running counter by one
+        data.getCompensationRunningCounter().decrementAndGet();
+        txEntity.setState(domainEvent.getState());
+        LOG.info("compensation is completed {}", txEntity.getLocalTxId());
       }
-    } else if (event instanceof SagaEvent) {
-      if (event instanceof SagaAbortedEvent) {
+    } else if (event instanceof SagaEndedDomain) {
+      SagaEndedDomain domainEvent = (SagaEndedDomain) event;
+      if (domainEvent.getState() == SagaActorState.FAILED) {
         data.getTxEntityMap().forEach((k, v) -> {
           if (v.getState() == TxState.COMMITTED) {
             // call compensate
             compensation(v, data);
           }
         });
+      } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
+
+      } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
+
       }
     }
+    LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    return data;
+  }
+
+  @Override
+  public void onRecoveryCompleted() {
+    LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+  }
+
+  @Override
+  public Class domainEventClass() {
+    return DomainEvent.class;
+  }
+
+
+  @Override
+  public String persistenceId() {
+    return persistenceId;
   }
 
   private boolean hasCommittedTx(SagaData data) {
@@ -371,9 +441,17 @@ public class SagaActor extends
         .count() > 0;
   }
 
+  private boolean hasCompensationSentTx(SagaData data) {
+    return data.getTxEntityMap().entrySet().stream()
+        .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
+        .count() > 0;
+  }
+
   private void compensation(TxEntity txEntity, SagaData data) {
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
+    //TODO call omega compensate method
     LOG.info("compensate {}", txEntity.getLocalTxId());
+    txEntity.setState(TxState.COMPENSATION_SENT);
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
index 0494895..af02ce8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -21,5 +21,6 @@ public enum TxState {
   ACTIVE,
   FAILED,
   COMMITTED,
-  COMPENSATED;
+  COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent
+  COMPENSATED
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
similarity index 51%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index 5794566..c7c65a3 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -15,29 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
-public abstract class BaseEvent implements Serializable {
-  private String globalTxId;
+public class AddTxEventDomain implements DomainEvent {
+  private String parentTxId;
+  private String localTxId;
+  private TxState state = TxState.ACTIVE;
 
-  public BaseEvent() {
+  public AddTxEventDomain(String parentTxId, String localTxId) {
+    this.parentTxId = parentTxId;
+    this.localTxId = localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
 
+  public String getLocalTxId() {
+    return localTxId;
   }
 
-  public String getGlobalTxId() {
-    return globalTxId;
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
+  public TxState getState() {
+    return state;
   }
 
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public void setState(TxState state) {
+    this.state = state;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
similarity index 85%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
index 0494895..10c293c 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
+
+import java.io.Serializable;
+
+public interface DomainEvent extends Serializable {
 
-public enum TxState {
-  ACTIVE,
-  FAILED,
-  COMMITTED,
-  COMPENSATED;
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
similarity index 70%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 0494895..d3e40b1 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -15,11 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-public enum TxState {
-  ACTIVE,
-  FAILED,
-  COMMITTED,
-  COMPENSATED;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+
+public class SagaEndedDomain implements DomainEvent {
+
+  private SagaActorState state;
+
+  public SagaEndedDomain(SagaActorState state) {
+    this.state = state;
+  }
+
+  public SagaActorState getState() {
+    return state;
+  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
similarity index 65%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
index 5794566..fe75f04 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
@@ -15,29 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+public class SagaStartedDomain implements DomainEvent {
 
-public abstract class BaseEvent implements Serializable {
+  private long createTime;
   private String globalTxId;
+  private long expirationTime;
 
-  public BaseEvent() {
+  public SagaStartedDomain(String globalTxId, long createTime, int timeout) {
+    this.createTime = createTime;
+    this.globalTxId = globalTxId;
+    if (timeout > 0) {
+      this.expirationTime = System.currentTimeMillis() + timeout * 1000;
+    }
+  }
 
+  public long getCreateTime() {
+    return createTime;
   }
 
   public String getGlobalTxId() {
     return globalTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
-  }
-
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public long getExpirationTime() {
+    return expirationTime;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
similarity index 50%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 5794566..839cfe0 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -15,29 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
 
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
-public abstract class BaseEvent implements Serializable {
-  private String globalTxId;
+public class UpdateTxEventDomain implements DomainEvent {
+  private String parentTxId;
+  private String localTxId;
+  private TxState state;
 
-  public BaseEvent() {
+  public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state) {
+    this.parentTxId = parentTxId;
+    this.localTxId = localTxId;
+    this.state = state;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
 
+  public String getLocalTxId() {
+    return localTxId;
   }
 
-  public String getGlobalTxId() {
-    return globalTxId;
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
+  public TxState getState() {
+    return state;
   }
 
-  @Override
-  public String toString() {
-    return "BaseEvent{" +
-        "globalTxId='" + globalTxId + '\'' +
-        '}';
+  public void setState(TxState state) {
+    this.state = state;
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
similarity index 60%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
index b16e44a..225ef26 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
@@ -17,15 +17,24 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.event;
 
-public class SagaDomainEvent {
-  public interface DomainEvent {}
-
-  public enum SagaStartedEvent implements DomainEvent {INSTANCE}
-  public enum SagaEndedEvent implements DomainEvent {INSTANCE}
-  public enum SagaAbortedEvent implements DomainEvent {INSTANCE}
-  public enum SagaTimeoutEvent implements DomainEvent {INSTANCE}
-  public enum TxStartedEvent implements DomainEvent {INSTANCE}
-  public enum TxEndedEvent implements DomainEvent {INSTANCE}
-  public enum TxAbortedEvent implements DomainEvent {INSTANCE}
-  public enum TxComponsitedEvent implements DomainEvent {INSTANCE}
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxComponsitedCheckInternalEvent extends TxEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxComponsitedCheckInternalEvent txComponsitedEvent;
+
+    private Builder() {
+      txComponsitedEvent = new TxComponsitedCheckInternalEvent();
+    }
+
+    public TxComponsitedCheckInternalEvent build() {
+      return txComponsitedEvent;
+    }
+  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
index 5794566..4ba8372 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -21,11 +21,16 @@ import java.io.Serializable;
 
 public abstract class BaseEvent implements Serializable {
   private String globalTxId;
+  private long createTime = System.currentTimeMillis();
 
   public BaseEvent() {
 
   }
 
+  public long getCreateTime() {
+    return createTime;
+  }
+
   public String getGlobalTxId() {
     return globalTxId;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 5644c1b..313c526 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -25,9 +25,13 @@ import akka.actor.Terminated;
 import akka.persistence.fsm.PersistentFSM;
 import akka.persistence.fsm.PersistentFSM.CurrentState;
 import akka.testkit.javadsl.TestKit;
+import com.typesafe.config.ConfigFactory;
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,9 +40,30 @@ public class SagaActorTest {
 
   static ActorSystem system;
 
+  private static Map<String,Object> getPersistenceMemConfig(){
+    Map<String, Object> map = new HashMap<>();
+    map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem");
+    map.put("akka.persistence.journal.leveldb.dir", "target/example/journal");
+    map.put("akka.persistence.snapshot-store.plugin", "akka.persistence.snapshot-store.local");
+    map.put("akka.persistence.snapshot-store.local.dir", "target/example/snapshots");
+    return map;
+  }
+
+  private static Map<String,Object> getPersistenceRedisConfig(){
+    Map<String, Object> map = new HashMap<>();
+    map.put("akka.actor.warn-about-java-serializer-usage",false);
+    map.put("akka.persistence.journal.plugin", "akka-persistence-redis.journal");
+    map.put("akka.persistence.snapshot-store.plugin", "akka-persistence-redis.snapshot");
+    map.put("akka-persistence-redis.redis.mode", "simple");
+    map.put("akka-persistence-redis.redis.host", "localhost");
+    map.put("akka-persistence-redis.redis.port", "6379");
+    map.put("akka-persistence-redis.redis.database", "0");
+    return map;
+  }
+
   @BeforeClass
   public static void setup() {
-    system = ActorSystem.create("SagaActorTest");
+    system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
   }
 
   @AfterClass
@@ -69,7 +94,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -122,6 +147,158 @@ public class SagaActorTest {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 6. SagaEndedEvent-1
+   */
+  @Test
+  public void successfulRecoveryWithCorrectStateDataTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      String persistenceId = genPersistenceId();
+      ActorRef saga = system.actorOf(SagaActor.props(persistenceId));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      //expectTerminated(saga);
+
+      ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+      watch(recoveredSaga);
+      recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        recoveredSaga.tell(event, getRef());
+      });
+
+      currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), recoveredSaga);
+      system.stop(saga);
+    }};
+  }
+
+  @Test
+  public void successfulRecoveryWithCorrectStateDataTestAndDiffentSystem() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    String persistenceId = genPersistenceId();
+    ActorSystem system1 = ActorSystem.create("SagaActorTest1", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+    new TestKit(system1) {{
+      ActorRef saga = system1.actorOf(SagaActor.props(persistenceId));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+      system1.stop(saga);
+    }};
+    TestKit.shutdownActorSystem(system1);
+
+    ActorSystem system2 = ActorSystem.create("SagaActorTest2", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+    new TestKit(system2) {{
+      ActorRef recoveredSaga = system2.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+      watch(recoveredSaga);
+      recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        recoveredSaga.tell(event, getRef());
+      });
+
+      CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), recoveredSaga);
+      system2.stop(recoveredSaga);
+    }};
+    TestKit.shutdownActorSystem(system2);
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 7. SagaAbortedEvent-1
    */
@@ -131,7 +308,7 @@ public class SagaActorTest {
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -185,7 +362,7 @@ public class SagaActorTest {
       final String localTxId_1 = UUID.randomUUID().toString();
       final String localTxId_2 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -249,7 +426,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -303,6 +480,79 @@ public class SagaActorTest {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  @Test
+  public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxEndedEvent-12
@@ -320,7 +570,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -381,7 +631,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -417,7 +667,13 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -425,12 +681,6 @@ public class SagaActorTest {
       assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
-      transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
-
-      Terminated terminated = expectMsgClass(Terminated.class);
-      assertEquals(terminated.getActor(), saga);
-
       system.stop(saga);
     }};
   }
@@ -454,7 +704,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -522,7 +772,7 @@ public class SagaActorTest {
       final String localTxId_3 = UUID.randomUUID().toString();
       final int timeout = 5;
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
@@ -584,7 +834,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -639,7 +889,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -702,7 +952,7 @@ public class SagaActorTest {
       final String localTxId_2 = UUID.randomUUID().toString();
       final String localTxId_3 = UUID.randomUUID().toString();
 
-      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
       SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index a3ddd0d..d303f86 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -124,6 +124,33 @@ public class SagaEventSender {
   /**
    * 1. SagaStartedEvent-1
    * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. SagaAbortedEvent-1
+   * 9. TxComponsitedEvent-11
+   * 10. TxComponsitedEvent-12
+   */
+  public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
    * 3. TxAbortedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxEndedEvent-12
@@ -292,6 +319,37 @@ public class SagaEventSender {
     sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
-  }  
+  }
+
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   */
+  public static List<BaseEvent> successfulFirstHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. TxEndedEvent-12
+   * 2. TxStartedEvent-13
+   * 3. TxEndedEvent-13
+   * 4. SagaEndedEvent-1
+   */
+  public static List<BaseEvent> successfulSecondHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
 
 }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 5a213f1..85c8b18 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -36,9 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
         "alpha.model.actor.enabled=true",
-        "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
-        "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
-        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
+        "spring.profiles.active=akka-persistence-redis"
     })
 public class SagaIntegrationTest {
 
@@ -63,6 +61,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -85,6 +85,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 1
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED;
       }else{
@@ -106,6 +108,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 2
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED;
@@ -129,6 +133,34 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+      SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        sagaEventBus.post(event);
+      });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -153,6 +185,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -177,6 +211,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -201,6 +237,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -226,6 +264,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -250,6 +290,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -274,6 +316,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -298,6 +342,8 @@ public class SagaIntegrationTest {
       SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getBeginTime() > 0
+            && sagaData.getEndTime() >0
             && sagaData.getTxEntityMap().size() == 3
             && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
             && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml
index b3577a6..9ca5b2e 100644
--- a/alpha/alpha-fsm/src/test/resources/application.yaml
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -15,7 +15,25 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
+---
+spring:
+  profiles: akka-persistence-mem
 akkaConfig:
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
+  akka.persistence.journal.leveldb.dir: target/example/journal
   akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
-  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
+
+---
+spring:
+  profiles: akka-persistence-redis
+akkaConfig:
+  akka.persistence.journal.plugin: akka-persistence-redis.journal
+  akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot
+  akka-persistence-redis:
+    redis:
+      mode: simple
+      host: localhost
+      port: 6379
+      database: 0
+      #password:
\ No newline at end of file
diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png
index 349fb72..4bc1ba4 100644
Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ
diff --git a/docs/fsm/plantuml/saga-state-diagram.puml b/docs/fsm/plantuml/saga-state-diagram.puml
index 429687d..b9233c5 100644
--- a/docs/fsm/plantuml/saga-state-diagram.puml
+++ b/docs/fsm/plantuml/saga-state-diagram.puml
@@ -33,7 +33,7 @@ FAILED --> COMPENSATED : SagaAbortedEvent<font color=red>:doCompensation</font>
 
 FAILED --> SUSPENDED : SagaTimeoutEvent
 
-FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>
+FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>\nTxComponsitedCheckInternalEvent
 
 COMPENSATED --> [*]