You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/21 10:34:17 UTC

[servicecomb-pack] 01/05: SCB-1321 Add Alpha FSM prototype code

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 98bc27bba72437e10bdc74d850559058f6441d7a
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Fri Jun 21 17:07:56 2019 +0800

    SCB-1321 Add Alpha FSM prototype code
---
 alpha/alpha-fsm/pom.xml                            | 123 ++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 331 +++++++++++
 .../servicecomb/pack/alpha/fsm/SagaActorState.java |  19 +
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   8 +
 .../pack/alpha/fsm/event/SagaAbortedEvent.java     |  28 +
 .../pack/alpha/fsm/event/SagaDomainEvent.java      |  14 +
 .../pack/alpha/fsm/event/SagaEndedEvent.java       |  28 +
 .../pack/alpha/fsm/event/SagaStartedEvent.java     |  43 ++
 .../pack/alpha/fsm/event/SagaTimeoutEvent.java     |  27 +
 .../pack/alpha/fsm/event/TxAbortedEvent.java       |  39 ++
 .../pack/alpha/fsm/event/TxComponsitedEvent.java   |  38 ++
 .../pack/alpha/fsm/event/TxEndedEvent.java         |  39 ++
 .../pack/alpha/fsm/event/TxStartedEvent.java       |  38 ++
 .../pack/alpha/fsm/event/base/BaseEvent.java       |  26 +
 .../pack/alpha/fsm/event/base/SagaEvent.java       |  14 +
 .../pack/alpha/fsm/event/base/TxEvent.java         |  31 ++
 .../servicecomb/pack/alpha/fsm/model/SagaData.java | 131 +++++
 .../servicecomb/pack/alpha/fsm/model/TxEntity.java |  95 ++++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 619 +++++++++++++++++++++
 .../alpha-fsm/src/test/resources/application.conf  |   3 +
 alpha/alpha-fsm/src/test/resources/log4j2.xml      |  30 +
 21 files changed, 1724 insertions(+)

diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
new file mode 100644
index 0000000..faa4e05
--- /dev/null
+++ b/alpha/alpha-fsm/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>alpha</artifactId>
+    <groupId>org.apache.servicecomb.pack</groupId>
+    <version>0.5.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>alpha-fsm</artifactId>
+  <name>Pack::Alpha::Fsm</name>
+
+  <properties>
+    <leveldbjni-all.version>1.8</leveldbjni-all.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>com.typesafe.akka</groupId>
+        <artifactId>akka-persistence_2.12</artifactId>
+        <version>${akka.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>pack-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- log -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>javax.persistence</groupId>
+      <artifactId>javax.persistence-api</artifactId>
+    </dependency>    
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- akka -->
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-persistence_2.12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>${leveldbjni-all.version}</version>
+    </dependency>
+
+    <!-- For testing the artifacts scope are test-->
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.seanyinx</groupId>
+      <artifactId>unit-scaffolding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-testkit_2.12</artifactId>
+    </dependency>       
+  </dependencies>
+
+</project>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
new file mode 100644
index 0000000..492d187
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -0,0 +1,331 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+
+import akka.actor.Props;
+import akka.persistence.fsm.AbstractPersistentFSM;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+public class SagaActor extends
+    AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static Props props(String persistenceId) {
+    return Props.create(SagaActor.class, persistenceId);
+  }
+
+  private final String persistenceId;
+
+  public SagaActor(String persistenceId) {
+    this.persistenceId = persistenceId;
+
+    startWith(SagaActorState.IDEL, SagaData.builder().build());
+
+    when(SagaActorState.IDEL,
+        matchEvent(SagaStartedEvent.class,
+            (event, data) -> {
+              data.setGlobalTxId(event.getGlobalTxId());
+              data.setBeginTime(System.currentTimeMillis());
+              if (event.getTimeout() > 0) {
+                data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000);
+                return goTo(SagaActorState.READY)
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return goTo(SagaActorState.READY);
+              }
+            }
+
+        )
+    );
+
+    when(SagaActorState.READY,
+        matchEvent(TxStartedEvent.class, SagaData.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              if (data.getExpirationTime() > 0) {
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+              }
+            }
+        ).event(SagaEndedEvent.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED).replying(data);
+            }
+        ).event(SagaAbortedEvent.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED).replying(data);
+            }
+        ).event(Arrays.asList(StateTimeout()), SagaData.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED)
+                  .replying(data);
+            })
+    );
+
+    when(SagaActorState.PARTIALLY_ACTIVE,
+        matchEvent(TxEndedEvent.class, SagaData.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              if (data.getExpirationTime() > 0) {
+                return goTo(SagaActorState.PARTIALLY_COMMITTED)
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return goTo(SagaActorState.PARTIALLY_COMMITTED);
+              }
+            }
+        ).event(SagaTimeoutEvent.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED)
+                  .replying(data)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+            }
+        ).event(TxAbortedEvent.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              return goTo(SagaActorState.FAILED);
+            }
+        ).event(Arrays.asList(StateTimeout()), SagaData.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED).replying(data);
+            })
+    );
+
+    when(SagaActorState.PARTIALLY_COMMITTED,
+        matchEvent(TxStartedEvent.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              if (data.getExpirationTime() > 0) {
+                return goTo(SagaActorState.PARTIALLY_ACTIVE)
+                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+              } else {
+                return goTo(SagaActorState.PARTIALLY_ACTIVE);
+              }
+            }
+        ).event(SagaTimeoutEvent.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED)
+                  .replying(data)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+            }
+        ).event(SagaEndedEvent.class,
+            (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              return goTo(SagaActorState.COMMITTED)
+                  .replying(data)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+            }
+        ).event(SagaAbortedEvent.class,
+            (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              updateTxEntity(event, data);
+              return goTo(SagaActorState.FAILED);
+            }
+        ).event(TxAbortedEvent.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              return goTo(SagaActorState.FAILED);
+            }
+        ).event(Arrays.asList(StateTimeout()), SagaData.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED).replying(data);
+            })
+    );
+
+    when(SagaActorState.FAILED,
+        matchEvent(SagaTimeoutEvent.class, SagaData.class,
+            (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              return goTo(SagaActorState.SUSPENDED)
+                  .replying(data)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+            }
+        ).event(TxComponsitedEvent.class, SagaData.class,
+            (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              updateTxEntity(event, data);
+              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
+                  || hasCommittedTx(data)) {
+                return stay();
+              } else {
+                return goTo(SagaActorState.COMPENSATED)
+                    .replying(data)
+                    .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+              }
+            }
+        ).event(SagaAbortedEvent.class, SagaData.class,
+            (event, data) -> {
+              data.setEndTime(System.currentTimeMillis());
+              updateTxEntity(event, data);
+              data.setTerminated(true);
+              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
+                  || hasCommittedTx(data)) {
+                return stay();
+              }else{
+                return goTo(SagaActorState.COMPENSATED)
+                    .replying(data)
+                    .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+              }
+            }
+        ).event(TxStartedEvent.class, SagaData.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              return stay();
+            }
+        ).event(TxEndedEvent.class, SagaData.class,
+            (event, data) -> {
+              updateTxEntity(event, data);
+              // TODO 调用补偿方法
+              TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
+              compensation(txEntity, data);
+              return stay();
+            }
+        ).event(Arrays.asList(StateTimeout()), SagaData.class,
+            (event, data) -> {
+              return goTo(SagaActorState.SUSPENDED).replying(data);
+            })
+    );
+
+    when(SagaActorState.COMMITTED,
+        matchAnyEvent(
+            (event, data) -> {
+              return stop();
+            }
+        )
+    );
+
+    when(SagaActorState.SUSPENDED,
+        matchAnyEvent(
+            (event, data) -> {
+              return stop();
+            }
+        )
+    );
+
+    when(SagaActorState.COMPENSATED,
+        matchAnyEvent(
+            (event, data) -> {
+              return stop();
+            }
+        )
+    );
+
+    whenUnhandled(
+        matchAnyEvent((event, data) -> {
+          LOG.error("unmatch event {}", event);
+          return stay();
+        })
+    );
+
+    onTransition(
+        matchState(null, null, (from, to) -> {
+          LOG.info("transition {} {} -> {}", getSelf(), from, to);
+        })
+    );
+
+  }
+
+  @Override
+  public void onRecoveryCompleted() {
+    LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData());
+  }
+
+  @Override
+  public Class domainEventClass() {
+    return SagaDomainEvent.DomainEvent.class;
+  }
+
+
+  @Override
+  public String persistenceId() {
+    return persistenceId;
+  }
+
+  @Override
+  public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) {
+    return currentData;
+  }
+
+  private void updateTxEntity(BaseEvent event, SagaData data) {
+    if (event instanceof TxEvent) {
+      TxEvent txEvent = (TxEvent) event;
+      if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) {
+        if (event instanceof TxStartedEvent) {
+          TxEntity txEntity = TxEntity.builder()
+              .localTxId(txEvent.getLocalTxId())
+              .parentTxId(txEvent.getParentTxId())
+              .state(TxState.ACTIVE)
+              .build();
+          data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
+        }
+      } else {
+        TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId());
+        if (event instanceof TxEndedEvent) {
+          if (txEntity.getState() == TxState.ACTIVE) {
+            txEntity.setEndTime(System.currentTimeMillis());
+            txEntity.setState(TxState.COMMITTED);
+          }
+        } else if (event instanceof TxAbortedEvent) {
+          if (txEntity.getState() == TxState.ACTIVE) {
+            txEntity.setEndTime(System.currentTimeMillis());
+            txEntity.setState(TxState.FAILED);
+            // TODO 调用补偿方法
+            data.getTxEntityMap().forEach((k, v) -> {
+              if (v.getState() == TxState.COMMITTED) {
+                // TODO 调用补偿方法
+                compensation(v, data);
+              }
+            });
+          }
+        } else if (event instanceof TxComponsitedEvent) {
+          //补偿中计数器减一
+          data.getCompensationRunningCounter().decrementAndGet();
+          txEntity.setState(TxState.COMPENSATED);
+          LOG.info("完成补偿 {}",txEntity.getLocalTxId());
+        }
+      }
+    } else if (event instanceof SagaEvent) {
+      if (event instanceof SagaAbortedEvent) {
+        data.getTxEntityMap().forEach((k, v) -> {
+          if (v.getState() == TxState.COMMITTED) {
+            // TODO 调用补偿方法
+            compensation(v, data);
+          }
+        });
+      }
+    }
+  }
+
+  private boolean hasCommittedTx(SagaData data) {
+    return data.getTxEntityMap().entrySet().stream()
+        .filter(map -> map.getValue().getState() == TxState.COMMITTED)
+        .count() > 0;
+  }
+
+  private void compensation(TxEntity txEntity, SagaData data) {
+    //补偿中计数器加一
+    data.getCompensationRunningCounter().incrementAndGet();
+    LOG.info("调用补偿方法 {}", txEntity.getLocalTxId());
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
new file mode 100644
index 0000000..0c7be27
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
@@ -0,0 +1,19 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.persistence.fsm.PersistentFSM;
+
+public enum SagaActorState implements PersistentFSM.FSMState {
+  IDEL,
+  READY,
+  PARTIALLY_ACTIVE,
+  PARTIALLY_COMMITTED,
+  FAILED,
+  COMMITTED,
+  COMPENSATED,
+  SUSPENDED;
+
+  @Override
+  public String identifier() {
+    return name();
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
new file mode 100644
index 0000000..e9c54e4
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -0,0 +1,8 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+public enum TxState {
+  ACTIVE,
+  FAILED,
+  COMMITTED,
+  COMPENSATED;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java
new file mode 100644
index 0000000..351146b
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java
@@ -0,0 +1,28 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaAbortedEvent extends SagaEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private SagaAbortedEvent sagaAbortedEvent;
+
+    private Builder() {
+      sagaAbortedEvent = new SagaAbortedEvent();
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      sagaAbortedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public SagaAbortedEvent build() {
+      return sagaAbortedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
new file mode 100644
index 0000000..b421c73
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
@@ -0,0 +1,14 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+public class SagaDomainEvent {
+  public interface DomainEvent {}
+
+  public enum SagaStartedEvent implements DomainEvent {INSTANCE}
+  public enum SagaEndedEvent implements DomainEvent {INSTANCE}
+  public enum SagaAbortedEvent implements DomainEvent {INSTANCE}
+  public enum SagaTimeoutEvent implements DomainEvent {INSTANCE}
+  public enum TxStartedEvent implements DomainEvent {INSTANCE}
+  public enum TxEndedEvent implements DomainEvent {INSTANCE}
+  public enum TxAbortedEvent implements DomainEvent {INSTANCE}
+  public enum TxComponsitedEvent implements DomainEvent {INSTANCE}
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java
new file mode 100644
index 0000000..7a87d77
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java
@@ -0,0 +1,28 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaEndedEvent extends SagaEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private SagaEndedEvent sagaEndedEvent;
+
+    private Builder() {
+      sagaEndedEvent = new SagaEndedEvent();
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      sagaEndedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public SagaEndedEvent build() {
+      return sagaEndedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java
new file mode 100644
index 0000000..4831be7
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java
@@ -0,0 +1,43 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaStartedEvent extends SagaEvent {
+  private int timeout; //second
+
+  public int getTimeout() {
+    return timeout;
+  }
+
+  public void setTimeout(int timeout) {
+    this.timeout = timeout;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private SagaStartedEvent sagaStartedEvent;
+
+    private Builder() {
+      sagaStartedEvent = new SagaStartedEvent();
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      sagaStartedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public Builder timeout(int timeout) {
+      sagaStartedEvent.setTimeout(timeout);
+      return this;
+    }
+
+    public SagaStartedEvent build() {
+      return sagaStartedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java
new file mode 100644
index 0000000..4de5c37
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java
@@ -0,0 +1,27 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaTimeoutEvent extends SagaEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+  public static final class Builder {
+
+    private SagaTimeoutEvent sagaTimeoutEvent;
+
+    private Builder() {
+      sagaTimeoutEvent = new SagaTimeoutEvent();
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      sagaTimeoutEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public SagaTimeoutEvent build() {
+      return sagaTimeoutEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
new file mode 100644
index 0000000..620366f
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
@@ -0,0 +1,39 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxAbortedEvent extends TxEvent {
+
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxAbortedEvent txAbortedEvent;
+
+    private Builder() {
+      txAbortedEvent = new TxAbortedEvent();
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txAbortedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txAbortedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txAbortedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public TxAbortedEvent build() {
+      return txAbortedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
new file mode 100644
index 0000000..cccb69a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxComponsitedEvent extends TxEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxComponsitedEvent txComponsitedEvent;
+
+    private Builder() {
+      txComponsitedEvent = new TxComponsitedEvent();
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txComponsitedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txComponsitedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txComponsitedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public TxComponsitedEvent build() {
+      return txComponsitedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java
new file mode 100644
index 0000000..33ea3d1
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java
@@ -0,0 +1,39 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxEndedEvent extends TxEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+
+  public static final class Builder {
+
+    private TxEndedEvent txEndedEvent;
+
+    private Builder() {
+      txEndedEvent = new TxEndedEvent();
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txEndedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txEndedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txEndedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public TxEndedEvent build() {
+      return txEndedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
new file mode 100644
index 0000000..2d10462
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxStartedEvent extends TxEvent {
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxStartedEvent txStartedEvent;
+
+    private Builder() {
+      txStartedEvent = new TxStartedEvent();
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txStartedEvent.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txStartedEvent.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      txStartedEvent.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public TxStartedEvent build() {
+      return txStartedEvent;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
new file mode 100644
index 0000000..ea8257d
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -0,0 +1,26 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+import java.io.Serializable;
+
+public abstract class BaseEvent implements Serializable {
+  private String globalTxId;
+
+  public BaseEvent() {
+
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
+  @Override
+  public String toString() {
+    return "BaseEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        '}';
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java
new file mode 100644
index 0000000..bf39855
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java
@@ -0,0 +1,14 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+public class SagaEvent extends BaseEvent {
+
+  public SagaEvent() {
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "{" +
+        "globalTxId='" + this.getGlobalTxId() + '\'' +
+        '}';
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
new file mode 100644
index 0000000..56a06e0
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
@@ -0,0 +1,31 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+public abstract class TxEvent extends BaseEvent {
+  private String parentTxId;
+  private String localTxId;
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "{" +
+        "globalTxId='" + this.getGlobalTxId() + '\'' +
+        "parentTxId='" + parentTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        '}';
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
new file mode 100644
index 0000000..0cc858a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -0,0 +1,131 @@
+package org.apache.servicecomb.pack.alpha.fsm.model;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SagaData implements Serializable {
+  private long beginTime = System.currentTimeMillis();
+  private long endTime;
+  private String globalTxId;
+  private long expirationTime;
+  private boolean terminated;
+  private AtomicLong compensationRunningCounter = new AtomicLong();
+  private Map<String,TxEntity> txEntityMap = new HashMap<>();
+
+  public long getBeginTime() {
+    return beginTime;
+  }
+
+  public void setBeginTime(long beginTime) {
+    this.beginTime = beginTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
+  public long getExpirationTime() {
+    return expirationTime;
+  }
+
+  public void setExpirationTime(long expirationTime) {
+    this.expirationTime = expirationTime;
+  }
+
+  public boolean isTerminated() {
+    return terminated;
+  }
+
+  public void setTerminated(boolean terminated) {
+    this.terminated = terminated;
+  }
+
+  public AtomicLong getCompensationRunningCounter() {
+    return compensationRunningCounter;
+  }
+
+  public void setCompensationRunningCounter(
+      AtomicLong compensationRunningCounter) {
+    this.compensationRunningCounter = compensationRunningCounter;
+  }
+
+  public Map<String, TxEntity> getTxEntityMap() {
+    return txEntityMap;
+  }
+
+  public void setTxEntityMap(
+      Map<String, TxEntity> txEntityMap) {
+    this.txEntityMap = txEntityMap;
+  }
+
+  public long getTimeout(){
+    return expirationTime-System.currentTimeMillis();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private SagaData sagaData;
+
+    private Builder() {
+      sagaData = new SagaData();
+    }
+
+    public Builder beginTime(long beginTime) {
+      sagaData.setBeginTime(beginTime);
+      return this;
+    }
+
+    public Builder endTime(long endTime) {
+      sagaData.setEndTime(endTime);
+      return this;
+    }
+
+    public Builder globalTxId(String globalTxId) {
+      sagaData.setGlobalTxId(globalTxId);
+      return this;
+    }
+
+    public Builder expirationTime(long expirationTime) {
+      sagaData.setExpirationTime(expirationTime);
+      return this;
+    }
+
+    public Builder terminated(boolean terminated) {
+      sagaData.setTerminated(terminated);
+      return this;
+    }
+
+    public Builder compensationRunningCounter(AtomicLong compensationRunningCounter) {
+      sagaData.setCompensationRunningCounter(compensationRunningCounter);
+      return this;
+    }
+
+    public Builder txEntityMap(Map<String, TxEntity> txEntityMap) {
+      sagaData.setTxEntityMap(txEntityMap);
+      return this;
+    }
+
+    public SagaData build() {
+      return sagaData;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
new file mode 100644
index 0000000..f6a3222
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -0,0 +1,95 @@
+package org.apache.servicecomb.pack.alpha.fsm.model;
+
+
+import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
+
+public class TxEntity implements Serializable {
+  private long beginTime = System.currentTimeMillis();
+  private long endTime;
+  private String parentTxId;
+  private String localTxId;
+  private TxState state;
+
+  public long getBeginTime() {
+    return beginTime;
+  }
+
+  public void setBeginTime(long beginTime) {
+    this.beginTime = beginTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
+  }
+
+  public TxState getState() {
+    return state;
+  }
+
+  public void setState(TxState state) {
+    this.state = state;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private TxEntity txEntity;
+
+    private Builder() {
+      txEntity = new TxEntity();
+    }
+
+    public Builder beginTime(long beginTime) {
+      txEntity.setBeginTime(beginTime);
+      return this;
+    }
+
+    public Builder endTime(long endTime) {
+      txEntity.setEndTime(endTime);
+      return this;
+    }
+
+    public Builder parentTxId(String parentTxId) {
+      txEntity.setParentTxId(parentTxId);
+      return this;
+    }
+
+    public Builder localTxId(String localTxId) {
+      txEntity.setLocalTxId(localTxId);
+      return this;
+    }
+
+    public Builder state(TxState state) {
+      txEntity.setState(state);
+      return this;
+    }
+
+    public TxEntity build() {
+      return txEntity;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
new file mode 100644
index 0000000..156d67e
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -0,0 +1,619 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import akka.persistence.fsm.PersistentFSM;
+import akka.persistence.fsm.PersistentFSM.CurrentState;
+import akka.testkit.javadsl.TestKit;
+import java.time.Duration;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SagaActorTest {
+
+  static ActorSystem system;
+
+  @BeforeClass
+  public static void setup() {
+    system = ActorSystem.create("SagaActorTest");
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    TestKit.shutdownActorSystem(system);
+    system = null;
+  }
+
+  public String genPersistenceId() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 6. SagaEndedEvent-1
+   */
+  @Test
+  public void successfulTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxAbortedEvent-11
+   * 7. SagaAbortedEvent-1
+   */
+  @Test
+  public void firstTxAbortedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 1);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxAbortedEvent-12
+   * 6. TxComponsitedEvent-11
+   * 7. SagaAbortedEvent-1
+   */
+  @Test
+  public void middleTxAbortedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 2);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  @Test
+  public void lastTxAbortedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxAbortedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxEndedEvent-13
+   * 8. TxComponsitedEvent-12
+   * 9. TxComponsitedEvent-13
+   * 10. SagaAbortedEvent-1
+   */
+  @Test
+  public void receivedRemainingEventAfterfirstTxAbortedEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxEndedEvent-13
+   * 8. SagaAbortedEvent-1
+   * 9. TxComponsitedEvent-11
+   * 8. TxComponsitedEvent-12
+   * 9. TxComponsitedEvent-13
+   */
+  @Test
+  public void sagaAbortedEventAfterAllTxEndedTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 5. SagaTimeoutEvent-1
+   */
+  @Test
+  public void omegaSendSagaTimeoutEventTest() {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(SagaTimeoutEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      SagaData sagaData = expectMsgClass(SagaData.class);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  /**
+   * 1. SagaStartedEvent(5s)-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 6. SagaEndedEvent-1
+   */
+  @Test
+  public void sagaActorTriggerTimeoutTest() throws InterruptedException {
+    new TestKit(system) {{
+      final String globalTxId = UUID.randomUUID().toString();
+      final String localTxId_1 = UUID.randomUUID().toString();
+      final String localTxId_2 = UUID.randomUUID().toString();
+      final String localTxId_3 = UUID.randomUUID().toString();
+      final int timeout = 5;
+
+      ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+      watch(saga);
+      saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      Thread.sleep(timeout*2000);
+      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      //expect
+      CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+      assertEquals(SagaActorState.IDEL, currentState.state());
+      PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+      transition = expectMsgClass(PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+      transition = expectMsgClass(Duration.ofSeconds(timeout+2),PersistentFSM.Transition.class);
+      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
+
+      Terminated terminated = expectMsgClass(Terminated.class);
+      assertEquals(terminated.getActor(), saga);
+
+      system.stop(saga);
+    }};
+  }
+
+  private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef,
+      SagaActorState from, SagaActorState to) {
+    assertEquals(transition.fsmRef(), actorRef);
+    assertEquals(transition.from(), from);
+    assertEquals(transition.to(), to);
+  }
+
+}
diff --git a/alpha/alpha-fsm/src/test/resources/application.conf b/alpha/alpha-fsm/src/test/resources/application.conf
new file mode 100644
index 0000000..1265036
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
+akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
+akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/test/resources/log4j2.xml b/alpha/alpha-fsm/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>