You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/27 05:49:46 UTC

[servicecomb-pack] branch SCB-1321 updated: SCB-1321 Add Saga Event Message Bus

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git


The following commit(s) were added to refs/heads/SCB-1321 by this push:
     new 281e34c  SCB-1321 Add Saga Event Message Bus
281e34c is described below

commit 281e34c10fcdfca66e6b659c95b2296d8a2573ce
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jun 27 13:49:02 2019 +0800

    SCB-1321 Add Saga Event Message Bus
---
 alpha/alpha-fsm/pom.xml                            |  39 ++-
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |  60 ++++
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  17 +-
 .../fsm/event/consumer/SagaEventConsumer.java      |  81 ++++++
 .../servicecomb/pack/alpha/fsm/model/SagaData.java |  10 +
 .../fsm/spring/integration/akka/LogExtension.java  |  31 +++
 .../spring/integration/akka/LogExtensionImpl.java  |  35 +++
 .../eventbus/EventSubscribeBeanPostProcessor.java  |  62 +++++
 .../src/main/resources/META-INF/spring.factories   |  17 ++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 165 +++--------
 .../pack/alpha/fsm/SagaApplication.java            |  28 ++
 .../pack/alpha/fsm/SagaEventSender.java            | 297 ++++++++++++++++++++
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 308 +++++++++++++++++++++
 13 files changed, 1014 insertions(+), 136 deletions(-)

diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index faa4e05..b1897d9 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -36,6 +36,13 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-dependencies</artifactId>
+        <version>${spring.boot.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
         <groupId>com.typesafe.akka</groupId>
         <artifactId>akka-persistence_2.12</artifactId>
         <version>${akka.version}</version>
@@ -44,6 +51,12 @@
   </dependencyManagement>
 
   <dependencies>
+    <!-- spring boot -->
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-autoconfigure</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>pack-common</artifactId>
@@ -52,21 +65,19 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-
-    <!-- log -->
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-log4j2</artifactId>
     </dependency>
     <dependency>
       <groupId>javax.persistence</groupId>
       <artifactId>javax.persistence-api</artifactId>
     </dependency>    
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-      <scope>test</scope>
-    </dependency>
+<!--    <dependency>-->
+<!--      <groupId>org.apache.logging.log4j</groupId>-->
+<!--      <artifactId>log4j-slf4j-impl</artifactId>-->
+<!--      <scope>test</scope>-->
+<!--    </dependency>-->
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
@@ -95,6 +106,16 @@
 
     <!-- For testing the artifacts scope are test-->
     <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework.boot</groupId>
+          <artifactId>spring-boot-starter-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
new file mode 100644
index 0000000..252d6ae
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.actor.ActorSystem;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = {"alpha.model.actor.enabled"})
+public class FsmAutoConfiguration {
+
+  @Bean
+  public ActorSystem actorSystem() {
+    ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration());
+    return system;
+  }
+
+  @Bean
+  public Config akkaConfiguration() {
+    return ConfigFactory.load();
+  }
+
+  @Bean(name = "sagaEventBus")
+  public EventBus sagaEventBus() {
+    return new EventBus();
+  }
+
+  @Bean
+  public SagaEventConsumer sagaEventConsumer(){
+    return new SagaEventConsumer();
+  }
+
+  @Bean
+  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+    return new EventSubscribeBeanPostProcessor();
+  }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 613a23b..dca006e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -37,6 +37,7 @@ import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -46,6 +47,7 @@ public class SagaActor extends
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+
   public static Props props(String persistenceId) {
     return Props.create(SagaActor.class, persistenceId);
   }
@@ -218,7 +220,7 @@ public class SagaActor extends
               if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
                   || hasCommittedTx(data)) {
                 return stay();
-              }else{
+              } else {
                 return goTo(SagaActorState.COMPENSATED)
                     .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
@@ -280,6 +282,17 @@ public class SagaActor extends
         })
     );
 
+    onTermination(
+        matchStop(
+            Normal(), (state, data) -> {
+              LOG.info("stop {} {}", data.getGlobalTxId(), state);
+              data.setTerminated(true);
+              data.setLastState(state);
+              LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data);
+            }
+        )
+    );
+
   }
 
   @Override
@@ -337,7 +350,7 @@ public class SagaActor extends
           // decrement the compensation running counter by one
           data.getCompensationRunningCounter().decrementAndGet();
           txEntity.setState(TxState.COMPENSATED);
-          LOG.info("compensation is completed {}",txEntity.getLocalTxId());
+          LOG.info("compensation is completed {}", txEntity.getLocalTxId());
         }
       }
     } else if (event instanceof SagaEvent) {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
new file mode 100644
index 0000000..aae9419
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class SagaEventConsumer {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final Timeout TIMEOUT = new Timeout(1000, TimeUnit.MILLISECONDS);
+
+  @Autowired
+  ActorSystem system;
+
+  /**
+   * Receive saga message
+   * */
+  @Subscribe
+  public void receiveSagaEvent(BaseEvent event) throws Exception {
+    LOG.info("receive {} ", event.toString());
+    try{
+      //TODO Write-Ahead Logging
+      ActorRef saga;
+      String actorPath = "/user/" + event.getGlobalTxId();
+      Optional<ActorRef> optional = this.getActorRefFromPath(actorPath);
+      if (!optional.isPresent()) {
+        saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+      } else {
+        saga = optional.get();
+      }
+      saga.tell(event, ActorRef.noSender());
+      LOG.info("tell {} to {}", event.toString(),saga);
+      //TODO WAL commit
+    }catch (Exception ex){
+      //TODO
+      throw ex;
+    }
+  }
+
+  public Optional<ActorRef> getActorRefFromPath(String path) throws Exception {
+    try {
+      ActorSelection selection = system.actorSelection(path);
+      Future<ActorRef> future = selection.resolveOne(TIMEOUT);
+      ActorRef ref = Await.result(future, TIMEOUT.duration());
+      return Optional.of(ref);
+    } catch (ActorNotFound e) {
+      return Optional.absent();
+    }
+  }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 3c794fa..4908029 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
 
 public class SagaData implements Serializable {
   private long beginTime = System.currentTimeMillis();
@@ -28,6 +29,7 @@ public class SagaData implements Serializable {
   private String globalTxId;
   private long expirationTime;
   private boolean terminated;
+  private SagaActorState lastState;
   private AtomicLong compensationRunningCounter = new AtomicLong();
   private Map<String,TxEntity> txEntityMap = new HashMap<>();
 
@@ -89,6 +91,14 @@ public class SagaData implements Serializable {
     this.txEntityMap = txEntityMap;
   }
 
+  public SagaActorState getLastState() {
+    return lastState;
+  }
+
+  public void setLastState(SagaActorState lastState) {
+    this.lastState = lastState;
+  }
+
   public long getTimeout(){
     return expirationTime-System.currentTimeMillis();
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
new file mode 100644
index 0000000..9c4d27d
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
+
+public class LogExtension extends AbstractExtensionId<LogExtensionImpl> {
+
+  public static final LogExtension LogExtensionProvider = new LogExtension();
+
+  @Override
+  public LogExtensionImpl createExtension(ExtendedActorSystem system) {
+    return new LogExtensionImpl();
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
new file mode 100644
index 0000000..4bf00b4
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import akka.actor.Extension;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+
+public class LogExtensionImpl implements Extension {
+  private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+
+  public void putSagaData(String globalTxId, SagaData sagaData){
+    sagaDataMap.put(globalTxId, sagaData);
+  }
+
+  public SagaData getSagaData(String globalTxId){
+    return sagaDataMap.get(globalTxId);
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java
new file mode 100644
index 0000000..11f2cb0
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+public class EventSubscribeBeanPostProcessor implements BeanPostProcessor {
+
+  @Autowired(required = false)
+  @Qualifier("sagaEventBus")
+  EventBus eventBus;
+
+  @Override
+  public Object postProcessBeforeInitialization(Object bean, String beanName)
+      throws BeansException {
+    return bean;
+  }
+
+  /**
+   * If the spring bean's method defines @Subscribe, then register the spring bean into the Guava
+   * Event
+   */
+  @Override
+  public Object postProcessAfterInitialization(Object bean, String beanName)
+      throws BeansException {
+    if(eventBus !=null){
+      Method[] methods = bean.getClass().getMethods();
+      for (Method method : methods) {
+        Annotation[] annotations = method.getAnnotations();
+        for (Annotation annotation : annotations) {
+          if (annotation.annotationType().equals(Subscribe.class)) {
+            eventBus.register(bean);
+            return bean;
+          }
+        }
+      }
+    }
+    return bean;
+  }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..ff42cdd
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.FsmAutoConfiguration
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 21c9402..5644c1b 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -25,28 +25,15 @@ import akka.actor.Terminated;
 import akka.persistence.fsm.PersistentFSM;
 import akka.persistence.fsm.PersistentFSM.CurrentState;
 import akka.testkit.javadsl.TestKit;
-import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class SagaActorTest {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   static ActorSystem system;
 
   @BeforeClass
@@ -85,14 +72,10 @@ public class SagaActorTest {
       ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -152,10 +135,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -207,13 +189,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -275,16 +253,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -353,16 +324,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -375,26 +339,14 @@ public class SagaActorTest {
       assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
 
       transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
-
-      transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
-
-      transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
-
-      transition = expectMsgClass(PersistentFSM.Transition.class);
-      assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
-
-      transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
       SagaData sagaData = expectMsgClass(SagaData.class);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
       assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
 
       transition = expectMsgClass(PersistentFSM.Transition.class);
@@ -433,17 +385,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+      SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -514,14 +458,9 @@ public class SagaActorTest {
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
 
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(SagaTimeoutEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -573,10 +512,9 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 4. TxStartedEvent-13
    * 5. TxEndedEvent-13
-   * 6. SagaEndedEvent-1
    */
   @Test
-  public void sagaActorTriggerTimeoutTest() throws InterruptedException {
+  public void sagaActorTriggerTimeoutTest() {
     new TestKit(system) {{
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
@@ -587,15 +525,10 @@ public class SagaActorTest {
       ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      Thread.sleep(timeout*2000);
-      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+      SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -644,7 +577,7 @@ public class SagaActorTest {
    * 8. SagaEndedEvent-1
    */
   @Test
-  public void successfulTestWithTxConcurrent() throws InterruptedException {
+  public void successfulWithTxConcurrentTest() {
     new TestKit(system) {{
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
@@ -654,14 +587,9 @@ public class SagaActorTest {
       ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -704,7 +632,7 @@ public class SagaActorTest {
    * 8. SagaEndedEvent-1
    */
   @Test
-  public void successfulTestWithTxConcurrentCross() throws InterruptedException {
+  public void successfulWithTxConcurrentCrossTest() throws InterruptedException {
     new TestKit(system) {{
       final String globalTxId = UUID.randomUUID().toString();
       final String localTxId_1 = UUID.randomUUID().toString();
@@ -714,14 +642,9 @@ public class SagaActorTest {
       ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -782,17 +705,9 @@ public class SagaActorTest {
       ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
       watch(saga);
       saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-
-      saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
-      saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
-      saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+      SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+        saga.tell(event, getRef());
+      });
 
       //expect
       CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java
new file mode 100644
index 0000000..7c14000
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SagaApplication {
+  public static void main(String[] args){
+    SpringApplication.run(SagaApplication.class, args);
+  }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
new file mode 100644
index 0000000..a3ddd0d
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+/**
+ * Event simulator
+ * */
+
+public class SagaEventSender {
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 6. SagaEndedEvent-1
+   */
+  public static List<BaseEvent> successfulEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxAbortedEvent-11
+   * 7. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> firstTxAbortedEvents(String globalTxId, String localTxId_1){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxAbortedEvent-12
+   * 6. TxComponsitedEvent-11
+   * 7. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxAbortedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxEndedEvent-13
+   * 8. TxComponsitedEvent-12
+   * 9. TxComponsitedEvent-13
+   * 10. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 6. TxStartedEvent-13
+   * 7. TxEndedEvent-13
+   * 8. SagaAbortedEvent-1
+   * 9. TxComponsitedEvent-11
+   * 8. TxComponsitedEvent-12
+   * 9. TxComponsitedEvent-13
+   */
+  public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   * 5. SagaTimeoutEvent-1
+   */
+  public static List<BaseEvent> omegaSendSagaTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaTimeoutEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent(5s)-1
+   * 2. TxStartedEvent-11
+   * 3. TxEndedEvent-11
+   * 4. TxStartedEvent-12
+   * 5. TxEndedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-13
+   */
+  public static List<BaseEvent> sagaActorTriggerTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3, int timeout){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxStartedEvent-12
+   * 4. TxStartedEvent-13
+   * 5. TxEndedEvent-11
+   * 6. TxEndedEvent-12
+   * 7. TxEndedEvent-13
+   * 8. SagaEndedEvent-1
+   */
+  public static List<BaseEvent> successfulWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 3. TxStartedEvent-12
+   * 5. TxEndedEvent-11
+   * 4. TxStartedEvent-13
+   * 6. TxEndedEvent-12
+   * 7. TxEndedEvent-13
+   * 8. SagaEndedEvent-1
+   */
+  public static List<BaseEvent> successfulWithTxConcurrentCrossEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }
+
+  /**
+   * 1. SagaStartedEvent-1
+   * 2. TxStartedEvent-11
+   * 4. TxStartedEvent-12
+   * 6. TxStartedEvent-13
+   * 3. TxEndedEvent-11
+   * 5. TxEndedEvent-12
+   * 7. TxAbortedEvent-13
+   * 8. TxComponsitedEvent-11
+   * 9. TxComponsitedEvent-12
+   * 10. SagaAbortedEvent-1
+   */
+  public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<BaseEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+    return sagaEvents;
+  }  
+
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
new file mode 100644
index 0000000..c13c6af
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.ActorSystem;
+import com.google.common.eventbus.EventBus;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {SagaApplication.class},
+    properties = {
+        "alpha.model.actor.enabled=true"
+    })
+public class SagaIntegrationTest {
+
+  @Autowired
+  @Qualifier("sagaEventBus")
+  EventBus sagaEventBus;
+
+  @Autowired
+  ActorSystem system;
+
+  @Test
+  public void successfulTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void firstTxAbortedEventTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 1
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void middleTxAbortedEventTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 2
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void lastTxAbortedEventTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void sagaAbortedEventAfterAllTxEndedTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void omegaSendSagaTimeoutEventTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void sagaActorTriggerTimeoutTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    final int timeout = 5; // second
+    SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(timeout+1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.SUSPENDED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void successfulWithTxConcurrentTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void successfulWithTxConcurrentCrossTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMMITTED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void lastTxAbortedEventWithTxConcurrentTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      sagaEventBus.post(event);
+    });
+
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      if(sagaData != null){
+        return sagaData.getLastState() == SagaActorState.COMPENSATED
+            && sagaData.getTxEntityMap().size() == 3
+            && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+            && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+      }else{
+        return false;
+      }
+    });
+  }
+
+}