You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/02 09:40:55 UTC

[servicecomb-pack] branch SCB-1321 updated (9cee529 -> 0ed1b15)

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a change to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from 9cee529  SCB-1321 Support Akka Persistent Redis Recovery
     new f59e348  SCB-1321 Add SagaActor call compensate
     new 0ed1b15  SCB-1321 Add GrpcSagaEventService for SagaActor Events

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-fsm/pom.xml                            |   4 +
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |   3 +
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  62 +++++---
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   2 +-
 .../pack/alpha/fsm/domain/AddTxEventDomain.java    |  42 +++++-
 ...mponsitedEvent.java => TxCompensatedEvent.java} |  16 +-
 .../pack/alpha/fsm/event/TxStartedEvent.java       | 103 +++++++++++++
 .../pack/alpha/fsm/event/base/TxEvent.java         |  18 +++
 .../fsm/event/consumer/SagaEventConsumer.java      |   5 +-
 .../servicecomb/pack/alpha/fsm/model/TxEntity.java |  70 +++++++++
 .../fsm/spring/integration/akka/LogExtension.java  |  31 ----
 ...ogExtensionImpl.java => SagaDataExtension.java} |  25 +++-
 .../integration/akka/SpringAkkaExtension.java      |  77 ++++++++++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  32 ++--
 .../pack/alpha/fsm/SagaEventSender.java            |  50 +++----
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  26 ++--
 alpha/alpha-server/pom.xml                         |  16 ++
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  21 +++
 .../alpha/server/{ => fsm}/GrpcOmegaCallback.java  |  11 +-
 .../alpha/server/fsm/GrpcSagaEventService.java     | 130 ++++++++++++++++
 .../src/main/resources/application.yaml            |   9 ++
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 134 +++++++++++++++++
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 159 ++++++++++++++++++++
 .../pack/alpha/server/fsm/OmegaEventSender.java    | 163 +++++++++++++++++++++
 .../src/test/resources/log4j2.xml                  |   0
 .../apache/servicecomb/pack/common/EventType.java  |   3 +-
 pom.xml                                            |   5 +
 27 files changed, 1087 insertions(+), 130 deletions(-)
 rename alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/{TxComponsitedEvent.java => TxCompensatedEvent.java} (77%)
 delete mode 100644 alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
 rename alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/{LogExtensionImpl.java => SagaDataExtension.java} (56%)
 create mode 100644 alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
 copy alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/{ => fsm}/GrpcOmegaCallback.java (96%)
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
 copy alpha/{alpha-fsm => alpha-server}/src/test/resources/log4j2.xml (100%)


[servicecomb-pack] 01/02: SCB-1321 Add SagaActor call compensate

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f59e348f5f4d181bfa3fcaa6bea137fc5675c942
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Tue Jul 2 17:16:27 2019 +0800

    SCB-1321 Add SagaActor call compensate
---
 alpha/alpha-fsm/pom.xml                            |   4 +
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |   3 +
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  62 +++++++++----
 .../apache/servicecomb/pack/alpha/fsm/TxState.java |   2 +-
 .../pack/alpha/fsm/domain/AddTxEventDomain.java    |  42 ++++++++-
 ...mponsitedEvent.java => TxCompensatedEvent.java} |  16 ++--
 .../pack/alpha/fsm/event/TxStartedEvent.java       | 103 +++++++++++++++++++++
 .../pack/alpha/fsm/event/base/TxEvent.java         |  18 ++++
 .../fsm/event/consumer/SagaEventConsumer.java      |   5 +-
 .../servicecomb/pack/alpha/fsm/model/TxEntity.java |  70 ++++++++++++++
 .../fsm/spring/integration/akka/LogExtension.java  |  31 -------
 ...ogExtensionImpl.java => SagaDataExtension.java} |  25 +++--
 .../integration/akka/SpringAkkaExtension.java      |  77 +++++++++++++++
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  32 +++----
 .../pack/alpha/fsm/SagaEventSender.java            |  50 +++++-----
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  26 +++---
 .../apache/servicecomb/pack/common/EventType.java  |   3 +-
 17 files changed, 447 insertions(+), 122 deletions(-)

diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index 8788430..b6c09d3 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -63,6 +63,10 @@
       <artifactId>pack-common</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index c5082ab..c92804d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.pack.alpha.fsm;
 
+import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
+
 import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
@@ -38,6 +40,7 @@ public class FsmAutoConfiguration {
   @Bean
   public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
     ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
+    SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
     return system;
   }
 
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index d22cb79..72be072 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -32,13 +32,14 @@ import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -81,8 +82,13 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
                     .applying(domainEvent)
@@ -131,8 +137,13 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return stay()
                     .applying(domainEvent)
@@ -165,8 +176,13 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               if (data.getExpirationTime() > 0) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
                     .applying(domainEvent)
@@ -230,7 +246,7 @@ public class SagaActor extends
                   .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
-        ).event(TxComponsitedEvent.class, SagaData.class,
+        ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
                   event.getLocalTxId(), TxState.COMPENSATED);
@@ -240,8 +256,9 @@ public class SagaActor extends
             }
         ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
             (event, data) -> {
-              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
-                  || hasCommittedTx(data)) {
+//              if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
+//                  || hasCommittedTx(data)) {
+              if (hasCompensationSentTx(data)) {
                 return stay().replying(data);
               } else {
                 return goTo(SagaActorState.COMPENSATED)
@@ -255,7 +272,7 @@ public class SagaActor extends
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
                 return stay().replying(data).applying(domainEvent);
-              } else if(hasCompensationSentTx(data)){
+              } else if (hasCompensationSentTx(data)) {
                 return stay().replying(data);
               } else {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(
@@ -268,8 +285,13 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
-              AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
-                  event.getLocalTxId());
+              AddTxEventDomain domainEvent = new AddTxEventDomain(
+                  event.getServiceName(),
+                  event.getInstanceId(),
+                  event.getParentTxId(),
+                  event.getLocalTxId(),
+                  event.getPayloads(),
+                  event.getCompensationMethod());
               return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
@@ -332,7 +354,7 @@ public class SagaActor extends
     whenUnhandled(
         matchAnyEvent((event, data) -> {
           LOG.error("Unhandled event {}", event);
-          return goTo(SagaActorState.SUSPENDED).replying(data);
+          return stay();
         })
     );
 
@@ -340,7 +362,7 @@ public class SagaActor extends
         matchState(null, null, (from, to) -> {
           if (stateData().getGlobalTxId() != null) {
             stateData().setLastState(to);
-            LogExtension.LogExtensionProvider.get(getContext().getSystem())
+            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
           LOG.info("transition {} {} -> {}", getSelf(), from, to);
@@ -353,7 +375,7 @@ public class SagaActor extends
               LOG.info("stop {} {}", data.getGlobalTxId(), state);
               data.setTerminated(true);
               data.setLastState(state);
-              LogExtension.LogExtensionProvider.get(getContext().getSystem())
+              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                   .putSagaData(data.getGlobalTxId(), data);
             }
         )
@@ -372,8 +394,13 @@ public class SagaActor extends
       AddTxEventDomain domainEvent = (AddTxEventDomain) event;
       if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) {
         TxEntity txEntity = TxEntity.builder()
+            .serviceName(domainEvent.getServiceName())
+            .instanceId(domainEvent.getInstanceId())
+            .globalTxId(data.getGlobalTxId())
             .localTxId(domainEvent.getLocalTxId())
             .parentTxId(domainEvent.getParentTxId())
+            .compensationMethod(domainEvent.getCompensationMethod())
+            .payloads(domainEvent.getPayloads())
             .state(domainEvent.getState())
             .build();
         data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
@@ -451,6 +478,7 @@ public class SagaActor extends
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     //TODO call omega compensate method
+    SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
     LOG.info("compensate {}", txEntity.getLocalTxId());
     txEntity.setState(TxState.COMPENSATION_SENT);
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
index af02ce8..655db30 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -21,6 +21,6 @@ public enum TxState {
   ACTIVE,
   FAILED,
   COMMITTED,
-  COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent
+  COMPENSATION_SENT, // The compensation method has been called to wait for TxCompensatedEvent
   COMPENSATED
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index c7c65a3..af5936d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -20,13 +20,37 @@ package org.apache.servicecomb.pack.alpha.fsm.domain;
 import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
 public class AddTxEventDomain implements DomainEvent {
+  private String serviceName;
+  private String instanceId;
   private String parentTxId;
   private String localTxId;
   private TxState state = TxState.ACTIVE;
+  private String compensationMethod;
+  private byte[] payloads;
 
-  public AddTxEventDomain(String parentTxId, String localTxId) {
+  public AddTxEventDomain(String serviceName, String instanceId, String parentTxId, String localTxId, byte[] payloads, String compensationMethod) {
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
     this.parentTxId = parentTxId;
     this.localTxId = localTxId;
+    this.compensationMethod = compensationMethod;
+    this.payloads = payloads;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
   }
 
   public String getParentTxId() {
@@ -52,4 +76,20 @@ public class AddTxEventDomain implements DomainEvent {
   public void setState(TxState state) {
     this.state = state;
   }
+
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
similarity index 77%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
index 1230fff..c007993 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event;
 
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
-public class TxComponsitedEvent extends TxEvent {
+public class TxCompensatedEvent extends TxEvent {
 
   public static Builder builder() {
     return new Builder();
@@ -27,29 +27,29 @@ public class TxComponsitedEvent extends TxEvent {
 
   public static final class Builder {
 
-    private TxComponsitedEvent txComponsitedEvent;
+    private TxCompensatedEvent txCompensatedEvent;
 
     private Builder() {
-      txComponsitedEvent = new TxComponsitedEvent();
+      txCompensatedEvent = new TxCompensatedEvent();
     }
 
     public Builder parentTxId(String parentTxId) {
-      txComponsitedEvent.setParentTxId(parentTxId);
+      txCompensatedEvent.setParentTxId(parentTxId);
       return this;
     }
 
     public Builder localTxId(String localTxId) {
-      txComponsitedEvent.setLocalTxId(localTxId);
+      txCompensatedEvent.setLocalTxId(localTxId);
       return this;
     }
 
     public Builder globalTxId(String globalTxId) {
-      txComponsitedEvent.setGlobalTxId(globalTxId);
+      txCompensatedEvent.setGlobalTxId(globalTxId);
       return this;
     }
 
-    public TxComponsitedEvent build() {
-      return txComponsitedEvent;
+    public TxCompensatedEvent build() {
+      return txCompensatedEvent;
     }
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
index bf259d0..5f173ef 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
@@ -17,9 +17,77 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.event;
 
+import java.util.Date;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
 
 public class TxStartedEvent extends TxEvent {
+  private String serviceName;
+  private String instanceId;
+  private String compensationMethod;
+  private byte[] payloads;
+  private Date creationTime;
+  private String retryMethod;
+  private int retries;
+
+  @Override
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  @Override
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
+
+  public Date getCreationTime() {
+    return creationTime;
+  }
+
+  public void setCreationTime(Date creationTime) {
+    this.creationTime = creationTime;
+  }
+
+  public String getRetryMethod() {
+    return retryMethod;
+  }
+
+  public void setRetryMethod(String retryMethod) {
+    this.retryMethod = retryMethod;
+  }
+
+  public int getRetries() {
+    return retries;
+  }
+
+  public void setRetries(int retries) {
+    this.retries = retries;
+  }
 
   public static Builder builder() {
     return new Builder();
@@ -48,6 +116,41 @@ public class TxStartedEvent extends TxEvent {
       return this;
     }
 
+    public Builder compensationMethod(String compensationMethod) {
+      txStartedEvent.setCompensationMethod(compensationMethod);
+      return this;
+    }
+
+    public Builder payloads(byte[] payloads) {
+      txStartedEvent.setPayloads(payloads);
+      return this;
+    }
+
+    public Builder serviceName(String serviceName) {
+      txStartedEvent.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txStartedEvent.setInstanceId(instanceId);
+      return this;
+    }
+
+    public Builder creationTime(Date creationTime) {
+      txStartedEvent.setCreationTime(creationTime);
+      return this;
+    }
+
+    public Builder retryMethod(String retryMethod) {
+      txStartedEvent.setRetryMethod(retryMethod);
+      return this;
+    }
+
+    public Builder retries(int retries) {
+      txStartedEvent.setRetries(retries);
+      return this;
+    }
+
     public TxStartedEvent build() {
       return txStartedEvent;
     }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
index ceaa242..61c3656 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.pack.alpha.fsm.event.base;
 
 public abstract class TxEvent extends BaseEvent {
+  private String serviceName;
+  private String instanceId;
   private String parentTxId;
   private String localTxId;
 
@@ -37,6 +39,22 @@ public abstract class TxEvent extends BaseEvent {
     this.localTxId = localTxId;
   }
 
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + "{" +
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index aae9419..7ddbc1a 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -37,19 +37,18 @@ import scala.concurrent.Future;
 public class SagaEventConsumer {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final Timeout TIMEOUT = new Timeout(1000, TimeUnit.MILLISECONDS);
+  public static final Timeout TIMEOUT = new Timeout(5, TimeUnit.SECONDS);
 
   @Autowired
   ActorSystem system;
 
   /**
-   * Receive saga message
+   * Receive fsm message
    * */
   @Subscribe
   public void receiveSagaEvent(BaseEvent event) throws Exception {
     LOG.info("receive {} ", event.toString());
     try{
-      //TODO Write-Ahead Logging
       ActorRef saga;
       String actorPath = "/user/" + event.getGlobalTxId();
       Optional<ActorRef> optional = this.getActorRefFromPath(actorPath);
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index 1eb3020..0122adc 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -21,11 +21,32 @@ import java.io.Serializable;
 import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
 public class TxEntity implements Serializable {
+  private String serviceName;
+  private String instanceId;
+  private String globalTxId;
   private long beginTime = System.currentTimeMillis();
   private long endTime;
   private String parentTxId;
   private String localTxId;
   private TxState state;
+  private String compensationMethod;
+  private byte[] payloads;
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
 
   public long getBeginTime() {
     return beginTime;
@@ -43,6 +64,14 @@ public class TxEntity implements Serializable {
     this.endTime = endTime;
   }
 
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
   public String getParentTxId() {
     return parentTxId;
   }
@@ -67,6 +96,22 @@ public class TxEntity implements Serializable {
     this.state = state;
   }
 
+  public String getCompensationMethod() {
+    return compensationMethod;
+  }
+
+  public void setCompensationMethod(String compensationMethod) {
+    this.compensationMethod = compensationMethod;
+  }
+
+  public byte[] getPayloads() {
+    return payloads;
+  }
+
+  public void setPayloads(byte[] payloads) {
+    this.payloads = payloads;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -89,6 +134,11 @@ public class TxEntity implements Serializable {
       return this;
     }
 
+    public Builder globalTxId(String globalTxId) {
+      txEntity.setGlobalTxId(globalTxId);
+      return this;
+    }
+
     public Builder parentTxId(String parentTxId) {
       txEntity.setParentTxId(parentTxId);
       return this;
@@ -99,11 +149,31 @@ public class TxEntity implements Serializable {
       return this;
     }
 
+    public Builder compensationMethod(String compensationMethod) {
+      txEntity.setCompensationMethod(compensationMethod);
+      return this;
+    }
+
+    public Builder payloads(byte[] payloads) {
+      txEntity.setPayloads(payloads);
+      return this;
+    }
+
     public Builder state(TxState state) {
       txEntity.setState(state);
       return this;
     }
 
+    public Builder serviceName(String serviceName) {
+      txEntity.setServiceName(serviceName);
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      txEntity.setInstanceId(instanceId);
+      return this;
+    }
+
     public TxEntity build() {
       return txEntity;
     }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
deleted file mode 100644
index 9c4d27d..0000000
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
-
-import akka.actor.AbstractExtensionId;
-import akka.actor.ExtendedActorSystem;
-
-public class LogExtension extends AbstractExtensionId<LogExtensionImpl> {
-
-  public static final LogExtension LogExtensionProvider = new LogExtension();
-
-  @Override
-  public LogExtensionImpl createExtension(ExtendedActorSystem system) {
-    return new LogExtensionImpl();
-  }
-}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
similarity index 56%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 4bf00b4..774a2e2 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -17,19 +17,32 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
 
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
 
-public class LogExtensionImpl implements Extension {
-  private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
-  public void putSagaData(String globalTxId, SagaData sagaData){
-    sagaDataMap.put(globalTxId, sagaData);
+  public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
+
+  @Override
+  public SagaDataExt createExtension(ExtendedActorSystem system) {
+    return new SagaDataExt();
   }
 
-  public SagaData getSagaData(String globalTxId){
-    return sagaDataMap.get(globalTxId);
+  public static class SagaDataExt implements Extension {
+    private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+
+    public void putSagaData(String globalTxId, SagaData sagaData){
+      sagaDataMap.put(globalTxId, sagaData);
+    }
+
+    public SagaData getSagaData(String globalTxId){
+      return sagaDataMap.get(globalTxId);
+    }
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
new file mode 100644
index 0000000..6994084
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import static org.apache.servicecomb.pack.common.EventType.TxStartedEvent;
+
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Extension;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SpringExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+
+public class SpringAkkaExtension extends AbstractExtensionId<SpringExt> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final SpringAkkaExtension SPRING_EXTENSION_PROVIDER = new SpringAkkaExtension();
+
+  @Override
+  public SpringExt createExtension(ExtendedActorSystem system) {
+    return new SpringExt();
+  }
+
+  public static class SpringExt implements Extension {
+
+    private static final String omegaCallbackBeanName = "omegaCallback";
+    private volatile ApplicationContext applicationContext;
+    private OmegaCallback omegaCallback;
+
+    public void compensate(TxEntity txEntity) {
+      if (applicationContext != null) {
+        if (applicationContext.containsBean(omegaCallbackBeanName)) {
+          omegaCallback = applicationContext.getBean(omegaCallbackBeanName, OmegaCallback.class);
+          TxEvent event = new TxEvent(
+              txEntity.getServiceName(),
+              txEntity.getInstanceId(),
+              txEntity.getGlobalTxId(),
+              txEntity.getLocalTxId(),
+              txEntity.getParentTxId(),
+              TxStartedEvent.name(),
+              txEntity.getCompensationMethod(),
+              txEntity.getPayloads());
+          omegaCallback.compensate(event);
+          LOG.info(omegaCallback.toString());
+        } else {
+          LOG.warn("Spring Bean {} doesn't exist in ApplicationContext", omegaCallbackBeanName);
+        }
+      } else {
+        LOG.warn("Spring ApplicationContext is null");
+      }
+    }
+
+    public void initialize(ApplicationContext applicationContext) {
+      this.applicationContext = applicationContext;
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 313c526..71962df 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -186,7 +186,7 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
 
-      //expectTerminated(saga);
+      //expectTerminated(fsm);
 
       ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
       watch(recoveredSaga);
@@ -352,7 +352,7 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxComponsitedEvent-11
+   * 6. TxCompensatedEvent-11
    * 7. SagaAbortedEvent-1
    */
   @Test
@@ -414,8 +414,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -485,8 +485,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -538,7 +538,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -558,8 +558,8 @@ public class SagaActorTest {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    * 10. SagaAbortedEvent-1
    */
   @Test
@@ -619,9 +619,9 @@ public class SagaActorTest {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 9. TxCompensatedEvent-11
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    */
   @Test
   public void sagaAbortedEventAfterAllTxEndedTest() {
@@ -673,7 +673,7 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -940,8 +940,8 @@ public class SagaActorTest {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   @Test
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index d303f86..925bbc2 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -24,7 +24,7 @@ import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
@@ -79,7 +79,7 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 4. TxStartedEvent-12
    * 5. TxAbortedEvent-12
-   * 6. TxComponsitedEvent-11
+   * 6. TxCompensatedEvent-11
    * 7. SagaAbortedEvent-1
    */
   public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){
@@ -89,7 +89,7 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -102,8 +102,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -115,8 +115,8 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -130,8 +130,8 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxAbortedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 10. TxComponsitedEvent-12
+   * 9. TxCompensatedEvent-11
+   * 10. TxCompensatedEvent-12
    */
   public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -143,8 +143,8 @@ public class SagaEventSender {
     sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     return sagaEvents;
   }
 
@@ -156,8 +156,8 @@ public class SagaEventSender {
    * 5. TxEndedEvent-12
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -169,8 +169,8 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
@@ -184,9 +184,9 @@ public class SagaEventSender {
    * 6. TxStartedEvent-13
    * 7. TxEndedEvent-13
    * 8. SagaAbortedEvent-1
-   * 9. TxComponsitedEvent-11
-   * 8. TxComponsitedEvent-12
-   * 9. TxComponsitedEvent-13
+   * 9. TxCompensatedEvent-11
+   * 8. TxCompensatedEvent-12
+   * 9. TxCompensatedEvent-13
    */
   public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
     List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -198,9 +198,9 @@ public class SagaEventSender {
     sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
     return sagaEvents;
   }
 
@@ -302,8 +302,8 @@ public class SagaEventSender {
    * 3. TxEndedEvent-11
    * 5. TxEndedEvent-12
    * 7. TxAbortedEvent-13
-   * 8. TxComponsitedEvent-11
-   * 9. TxComponsitedEvent-12
+   * 8. TxCompensatedEvent-11
+   * 9. TxCompensatedEvent-12
    * 10. SagaAbortedEvent-1
    */
   public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -315,8 +315,8 @@ public class SagaEventSender {
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
     sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
-    sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+    sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
     sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
     return sagaEvents;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 85c8b18..9a65bfb 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -24,7 +24,7 @@ import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -58,7 +58,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -82,7 +82,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -105,7 +105,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -130,7 +130,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -156,7 +156,7 @@ public class SagaIntegrationTest {
       });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -182,7 +182,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -208,7 +208,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
@@ -234,7 +234,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
             && sagaData.getBeginTime() > 0
@@ -261,7 +261,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(timeout+1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.SUSPENDED
             && sagaData.getBeginTime() > 0
@@ -287,7 +287,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -313,7 +313,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMMITTED
             && sagaData.getBeginTime() > 0
@@ -339,7 +339,7 @@ public class SagaIntegrationTest {
     });
 
     await().atMost(1, SECONDS).until(() -> {
-      SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       if(sagaData != null){
         return sagaData.getLastState() == SagaActorState.COMPENSATED
             && sagaData.getBeginTime() > 0
diff --git a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
index f3d0585..774b2e9 100644
--- a/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
+++ b/pack-common/src/main/java/org/apache/servicecomb/pack/common/EventType.java
@@ -23,5 +23,6 @@ public enum EventType {
   TxEndedEvent,
   TxAbortedEvent,
   TxCompensatedEvent,
-  SagaEndedEvent
+  SagaEndedEvent,
+  SagaAbortedEvent
 }


[servicecomb-pack] 02/02: SCB-1321 Add GrpcSagaEventService for SagaActor Events

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0ed1b152a4c30bcb6e89964462267e8fee3cc83e
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Tue Jul 2 17:33:20 2019 +0800

    SCB-1321 Add GrpcSagaEventService for SagaActor Events
---
 alpha/alpha-server/pom.xml                         |  16 ++
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  21 +++
 .../pack/alpha/server/fsm/GrpcOmegaCallback.java   |  50 +++++++
 .../alpha/server/fsm/GrpcSagaEventService.java     | 130 ++++++++++++++++
 .../src/main/resources/application.yaml            |   9 ++
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 134 +++++++++++++++++
 .../alpha/server/fsm/OmegaEventSagaSimulator.java  | 159 ++++++++++++++++++++
 .../pack/alpha/server/fsm/OmegaEventSender.java    | 163 +++++++++++++++++++++
 alpha/alpha-server/src/test/resources/log4j2.xml   |  30 ++++
 pom.xml                                            |   5 +
 10 files changed, 717 insertions(+)

diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 7f40401..df2b47d 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -87,11 +87,21 @@
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-fsm</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>alpha-spring-cloud-starter-consul</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>alpha-spring-cloud-starter-zookeeper</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
@@ -159,6 +169,12 @@
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework.boot</groupId>
+          <artifactId>spring-boot-starter-logging</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.hsqldb</groupId>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 211ee07..221bfe5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,6 +29,7 @@ import javax.annotation.PreDestroy;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
+import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
 import org.apache.servicecomb.pack.alpha.server.tcc.service.TccEventScanner;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationEventPublisher;
@@ -146,6 +148,7 @@ public class AlphaConfig {
   }
 
   @Bean
+  @ConditionalOnProperty(name = "alpha.model.actor.enabled", havingValue = "false", matchIfMissing = true)
   ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
       TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus) throws IOException {
@@ -162,6 +165,24 @@ public class AlphaConfig {
     return bootstrap;
   }
 
+  @Bean
+  @ConditionalOnProperty(name= "alpha.model.actor.enabled", havingValue = "true")
+  ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException {
+    ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
+        new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService);
+    new Thread(bootstrap::start).start();
+    tccPendingTaskRunner.start();
+    tccEventScanner.start();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      tccPendingTaskRunner.shutdown();
+      tccEventScanner.shutdown();
+    }));
+
+    return bootstrap;
+  }
+
   @PostConstruct
   void init() {
     new PendingTaskRunner(pendingCompensations, delay).run();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
new file mode 100644
index 0000000..f5299da
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcOmegaCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.core.TxEvent;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+
+class GrpcOmegaCallback implements OmegaCallback {
+
+  private final StreamObserver<GrpcCompensateCommand> observer;
+
+  GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+    this.observer = observer;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+        .setCompensationMethod(event.compensationMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+
+  @Override
+  public void disconnect() {
+    observer.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
new file mode 100644
index 0000000..2daac50
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static java.util.Collections.emptyMap;
+
+import com.google.common.eventbus.EventBus;
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import kamon.annotation.Trace;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcSagaEventService extends TxEventServiceImplBase {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
+
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+  private final EventBus sagaEventBus;
+
+  public GrpcSagaEventService(EventBus sagaEventBus,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    this.sagaEventBus = sagaEventBus;
+    this.omegaCallbacks = omegaCallbacks;
+  }
+
+  @Override
+  @Trace("alphaConnected")
+  public void onConnected(
+      GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+    omegaCallbacks
+        .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
+        .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+  }
+
+  // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
+  @Override
+  @Trace("alphaDisconnected")
+  public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
+        .remove(request.getInstanceId());
+
+    if (callback != null) {
+      callback.disconnect();
+    }
+
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  @Trace("onTransactionEvent")
+  public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("onText {}",message);
+    boolean ok = true;
+    BaseEvent event = null;
+    if (message.getType().equals(EventType.SagaStartedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .timeout(message.getTimeout()).build();
+    } else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent.builder()
+          .globalTxId(message.getGlobalTxId()).build();
+    } else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent.builder()
+          .globalTxId(message.getGlobalTxId()).build();
+    } else if (message.getType().equals(EventType.TxStartedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent.builder()
+          .serviceName(message.getServiceName())
+          .instanceId(message.getInstanceId())
+          .creationTime(new Date())
+          .globalTxId(message.getGlobalTxId())
+          .localTxId(message.getLocalTxId())
+          .parentTxId(message.getParentTxId().isEmpty() ? null : message.getParentTxId())
+          .compensationMethod(message.getCompensationMethod())
+          .retryMethod(message.getRetryMethod())
+          .retries(message.getRetries())
+          .payloads(message.getPayloads().toByteArray()).build();
+    } else if (message.getType().equals(EventType.TxEndedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else if (message.getType().equals(EventType.TxAbortedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) {
+      event = org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent.builder()
+          .globalTxId(message.getGlobalTxId())
+          .parentTxId(message.getParentTxId())
+          .localTxId(message.getLocalTxId()).build();
+    } else {
+      ok = false;
+    }
+    if (event != null) {
+      this.sagaEventBus.post(event);
+    }
+    responseObserver.onNext(ok ? ALLOW : REJECT);
+    responseObserver.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 55283d3..01b03f4 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -90,3 +90,12 @@ spring:
     properties:
       eclipselink:
         ddl-generation: none
+
+---
+spring:
+  profiles: akka-persistence-mem
+akkaConfig:
+  akka.persistence.journal.plugin: akka.persistence.journal.inmem
+  akka.persistence.journal.leveldb.dir: target/example/journal
+  akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
+  akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
new file mode 100644
index 0000000..f98474d
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.ActorSystem;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
+import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
+    properties = {
+        "alpha.server.host=0.0.0.0",
+        "alpha.server.port=8090",
+        "alpha.event.pollingInterval=1",
+        "spring.main.allow-bean-definition-overriding=true",
+        "alpha.model.actor.enabled=true",
+        "spring.profiles.active=akka-persistence-mem"
+       })
+public class AlphaIntegrationFsmTest {
+  private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();
+  private static final int port = 8090;
+
+  @Autowired(required = false)
+  ActorSystem system;
+
+  @Autowired
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  @BeforeClass
+  public static void beforeClass() {
+    omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build());
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    omegaEventSender.shutdown();
+  }
+
+  @Before
+  public void before() {
+    omegaEventSender.setOmegaCallbacks(omegaCallbacks);
+    omegaEventSender.reset();
+  }
+
+  @After
+  public void after() {
+    omegaEventSender.onDisconnected();
+  }
+
+  @Test
+  public void successfulTest() {
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    omegaEventSender.onConnected();
+    omegaEventSender.getOmegaEventSagaSimulator().sagaSuccessfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> omegaEventSender.getOmegaCallbacks() != null);
+    await().atMost(1, SECONDS).until(() -> SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId) != null);
+    SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMMITTED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED);
+  }
+
+  @Test
+  public void lastTxAbortedEventTest() {
+    omegaEventSender.onConnected();
+    final String globalTxId = UUID.randomUUID().toString();
+    final String localTxId_1 = UUID.randomUUID().toString();
+    final String localTxId_2 = UUID.randomUUID().toString();
+    final String localTxId_3 = UUID.randomUUID().toString();
+    omegaEventSender.getOmegaEventSagaSimulator().lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+      omegaEventSender.getBlockingStub().onTxEvent(event);
+    });
+    await().atMost(1, SECONDS).until(() -> omegaEventSender.getOmegaCallbacks() != null);
+    await().atMost(1, SECONDS).until(() -> {
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+    });
+    SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+    Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+    Assert.assertTrue(sagaData.getBeginTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > 0);
+    Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+    Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED);
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
new file mode 100644
index 0000000..6e3c3ce
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.contract.grpc.GrpcTxEvent;
+
+public class OmegaEventSagaSimulator {
+  private String serviceName;
+  private String instanceId;
+
+  public List<GrpcTxEvent> sagaSuccessfulEvents(String globalTxId, String localTxId_1,
+      String localTxId_2, String localTxId_3) {
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(sagaEndedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  public List<GrpcTxEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+    List<GrpcTxEvent> sagaEvents = new ArrayList<>();
+    sagaEvents.add(sagaStartedEvent(globalTxId));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_1, globalTxId, "service a".getBytes(), "method a"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txEndedEvent(globalTxId, localTxId_2, globalTxId, "service b".getBytes(), "method b"));
+    sagaEvents.add(txStartedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(txAbortedEvent(globalTxId, localTxId_3, globalTxId, "service c".getBytes(), "method c"));
+    sagaEvents.add(sagaAbortedEvent(globalTxId));
+    return sagaEvents;
+  }
+
+  private GrpcTxEvent sagaStartedEvent(String globalTxId) {
+    return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent sagaEndedEvent(String globalTxId) {
+    return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
+    return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
+        null, new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txStartedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txEndedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  private GrpcTxEvent txAbortedEvent(String globalTxId,
+      String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
+        parentTxId, payloads, compensationMethod, 0, "",
+        0);
+  }
+
+  public GrpcTxEvent txCompensatedEvent(String globalTxId,
+      String localTxId, String parentTxId) {
+    return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
+        parentTxId,  new byte[0], "", 0, "",
+        0);
+  }
+
+  private GrpcTxEvent eventOf(EventType eventType,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      byte[] payloads,
+      String compensationMethod,
+      int timeout,
+      String retryMethod,
+      int retries) {
+
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setParentTxId(parentTxId == null ? "" : parentTxId)
+        .setType(eventType.name())
+        .setCompensationMethod(compensationMethod)
+        .setTimeout(timeout)
+        .setRetryMethod(retryMethod)
+        .setRetries(retries)
+        .setPayloads(ByteString.copyFrom(payloads))
+        .build();
+  }
+
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+
+    private String serviceName;
+    private String instanceId;
+
+    private Builder() {
+    }
+
+    public Builder serviceName(String serviceName) {
+      this.serviceName = serviceName;
+      return this;
+    }
+
+    public Builder instanceId(String instanceId) {
+      this.instanceId = instanceId;
+      return this;
+    }
+
+    public OmegaEventSagaSimulator build() {
+      OmegaEventSagaSimulator omegaEventSagaSimulator = new OmegaEventSagaSimulator();
+      omegaEventSagaSimulator.serviceName = this.serviceName;
+      omegaEventSagaSimulator.instanceId = this.instanceId;
+      return omegaEventSagaSimulator;
+    }
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
new file mode 100644
index 0000000..a6a5c3c
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSender.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.server.fsm;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
+import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
+import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+public class OmegaEventSender {
+  GrpcServiceConfig serviceConfig;
+  protected ManagedChannel clientChannel;
+  private TxEventServiceStub asyncStub;
+  private TxEventServiceBlockingStub blockingStub;
+  private final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+  private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver(
+      this::onCompensation);
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+  private OmegaEventSagaSimulator omegaEventSagaSimulator;
+
+  private String serviceName;
+  private String instanceId;
+
+  public void configClient(ManagedChannel clientChannel){
+    this.clientChannel = clientChannel;
+    this.asyncStub = TxEventServiceGrpc.newStub(clientChannel);
+    this.blockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+  }
+
+  public void shutdown(){
+    this.clientChannel.shutdown();
+    this.clientChannel = null;
+  }
+
+  public void onConnected(){
+    serviceConfig = GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    omegaEventSagaSimulator = OmegaEventSagaSimulator.builder().serviceName(serviceName).instanceId(instanceId).build();
+
+  }
+
+  public void onDisconnected(){
+    blockingStub.onDisconnected(serviceConfig);
+  }
+
+  public void setOmegaCallbacks(
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    this.omegaCallbacks = omegaCallbacks;
+  }
+
+  public Queue<GrpcCompensateCommand> getReceivedCommands() {
+    return receivedCommands;
+  }
+
+  public TxEventServiceBlockingStub getBlockingStub() {
+    return blockingStub;
+  }
+
+  public Map<String, OmegaCallback> getOmegaCallbacks(){
+    return omegaCallbacks.get(serviceName);
+  }
+
+  public void reset(){
+    receivedCommands.clear();
+  }
+
+  public OmegaEventSagaSimulator getOmegaEventSagaSimulator(){
+    return omegaEventSagaSimulator;
+  }
+
+  private class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    private final Consumer<GrpcCompensateCommand> consumer;
+    private boolean completed = false;
+
+    private CompensationStreamObserver() {
+      this(command -> {});
+    }
+
+    private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+      this.consumer = consumer;
+    }
+
+    @Override
+    public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      consumer.accept(command);
+      receivedCommands.add(command);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onCompleted() {
+      completed = true;
+    }
+
+    boolean isCompleted() {
+      return completed;
+    }
+  }
+
+  private GrpcAck onCompensation(GrpcCompensateCommand command) {
+    return blockingStub.onTxEvent(omegaEventSagaSimulator.txCompensatedEvent(command.getGlobalTxId(),command.getLocalTxId(),command.getParentTxId()));
+  }
+
+  public static OmegaEventSender.Builder builder() {
+    return new OmegaEventSender.Builder();
+  }
+
+  public static final class Builder {
+
+    private String serviceName = uniquify("omega-serviceName");
+    private String instanceId = uniquify("omega-instanceId");
+
+    public OmegaEventSender.Builder serviceName(String serviceName) {
+      this.serviceName = serviceName;
+      return this;
+    }
+
+    public OmegaEventSender.Builder instanceId(String instanceId) {
+      this.instanceId = instanceId;
+      return this;
+    }
+
+    public OmegaEventSender build() {
+      OmegaEventSender omegaEventSender = new OmegaEventSender();
+      omegaEventSender.instanceId = this.instanceId;
+      omegaEventSender.serviceName = this.serviceName;
+      return omegaEventSender;
+    }
+  }
+}
diff --git a/alpha/alpha-server/src/test/resources/log4j2.xml b/alpha/alpha-server/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-server/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pom.xml b/pom.xml
index 3339b22..3c4e7ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,6 +317,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.servicecomb.pack</groupId>
+        <artifactId>alpha-fsm</artifactId>
+        <version>0.5.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.servicecomb.pack</groupId>
         <artifactId>alpha-server</artifactId>
         <version>0.5.0-SNAPSHOT</version>
       </dependency>