You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:17 UTC

[servicecomb-pack] 20/42: SCB-1368 Delete the Actor state persistent data after transaction data is saved successfully

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 6779ea15fa757ba10535391e01dac412f1dbe518
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Sat Sep 21 01:06:05 2019 +0800

    SCB-1368 Delete the Actor state persistent data after transaction data is saved successfully
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 278 +++++++++++++--------
 .../src/main/resources/application.yaml            |  19 +-
 2 files changed, 180 insertions(+), 117 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index ba4b0ff..4bd536e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -17,7 +17,9 @@
 
 package org.apache.servicecomb.pack.alpha.fsm;
 
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.cluster.sharding.ShardRegion;
 import akka.persistence.fsm.AbstractPersistentFSM;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
@@ -27,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.pack.alpha.core.AlphaException;
 import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
 import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
 import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
 import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
@@ -72,6 +75,7 @@ public class SagaActor extends
     when(SagaActorState.IDLE,
         matchEvent(SagaStartedEvent.class,
             (event, data) -> {
+              log(event);
               sagaBeginTime = System.currentTimeMillis();
               SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
               SagaStartedDomain domainEvent = new SagaStartedDomain(event);
@@ -92,6 +96,7 @@ public class SagaActor extends
     when(SagaActorState.READY,
         matchEvent(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -104,12 +109,14 @@ public class SagaActor extends
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
@@ -125,6 +132,7 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_ACTIVE,
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
@@ -137,6 +145,7 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class,
             (event, data) -> {
+              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return stay()
@@ -148,6 +157,7 @@ public class SagaActor extends
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED,
                   SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
@@ -155,6 +165,7 @@ public class SagaActor extends
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return goTo(SagaActorState.FAILED)
                   .applying(domainEvent);
@@ -169,6 +180,7 @@ public class SagaActor extends
     when(SagaActorState.PARTIALLY_COMMITTED,
         matchEvent(TxStartedEvent.class,
             (event, data) -> {
+              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_ACTIVE)
@@ -181,6 +193,7 @@ public class SagaActor extends
             }
         ).event(TxEndedEvent.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               if (data.getExpirationTime() != null) {
                 return stay()
@@ -192,23 +205,27 @@ public class SagaActor extends
             }
         ).event(SagaTimeoutEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(SagaEndedEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
                   .applying(domainEvent);
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
               return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
         ).event(TxAbortedEvent.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return goTo(SagaActorState.FAILED).applying(domainEvent);
             }
@@ -222,12 +239,14 @@ public class SagaActor extends
     when(SagaActorState.FAILED,
         matchEvent(SagaTimeoutEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent);
             }
         ).event(TxCompensatedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 self().tell(ComponsitedCheckEvent.builder().build(), self());
@@ -235,6 +254,7 @@ public class SagaActor extends
             }
         ).event(ComponsitedCheckEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               if (hasCompensationSentTx(data) || !data.isTerminated()) {
                 return stay();
               } else {
@@ -246,6 +266,7 @@ public class SagaActor extends
             }
         ).event(SagaAbortedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               data.setTerminated(true);
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
@@ -264,11 +285,13 @@ public class SagaActor extends
             }
         ).event(TxStartedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
               return stay().applying(domainEvent);
             }
         ).event(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
+              log(event);
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
               return stay().applying(domainEvent).andThen(exec(_data -> {
                 TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId());
@@ -287,27 +310,8 @@ public class SagaActor extends
     when(SagaActorState.COMMITTED,
         matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
-              //  以下基于 journal-redis 说明:
-              //    假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
-              //    journal:persistenceIds
-              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
-              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
-              //
-              //    1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
-              //    2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
-              //       使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
-              //    3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
-              //
-              //    何如清理:
-              //      通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
-              //      遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
-              //      如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
-              //      并删除 journal:persisted:item:highestSequenceNr
-              //
-              //  目前可以看到的解释是 https://github.com/akka/akka/issues/21181
-              deleteMessages(lastSequenceNr());
-              deleteSnapshot(snapshotSequenceNr());
+              log(event);
+              beforeStop(stateName(), data);
               return stop();
             }
         )
@@ -316,8 +320,8 @@ public class SagaActor extends
     when(SagaActorState.SUSPENDED,
         matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              deleteMessages(lastSequenceNr());
-              deleteSnapshot(snapshotSequenceNr());
+              log(event);
+              beforeStop(stateName(), data);
               return stop();
             }
         )
@@ -326,8 +330,8 @@ public class SagaActor extends
     when(SagaActorState.COMPENSATED,
         matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
             (event, data) -> {
-              deleteMessages(lastSequenceNr());
-              deleteSnapshot(snapshotSequenceNr());
+              log(event);
+              beforeStop(stateName(), data);
               return stop();
             }
         )
@@ -348,13 +352,14 @@ public class SagaActor extends
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
           if (LOG.isDebugEnabled()) {
-            LOG.debug("transition {} {} -> {}", getSelf(), from, to);
+            LOG.debug("transition {} {} -> {}", stateData().getGlobalTxId(), from, to);
           }
           if (to == SagaActorState.COMMITTED ||
               to == SagaActorState.SUSPENDED ||
               to == SagaActorState.COMPENSATED) {
             self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self());
           }
+          LOG.info("transition {} {} -> {}", stateData().getGlobalTxId(), from, to);
         })
     );
 
@@ -362,102 +367,151 @@ public class SagaActor extends
         matchStop(
             Normal(), (state, data) -> {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("stop {} {}", data.getGlobalTxId(), state);
+                LOG.debug("saga actor stopped {} {}", getSelf(), state);
               }
-              sagaEndTime = System.currentTimeMillis();
-              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
-              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(sagaEndTime - sagaBeginTime);
-              data.setLastState(state);
-              data.setEndTime(new Date());
-              data.setTerminated(true);
-              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
-                  .stopSagaData(data.getGlobalTxId(), data);
+              LOG.info("stopped {} {}", data.getGlobalTxId(), state);
             }
         )
     );
 
   }
 
-  @Override
-  public SagaData applyEvent(DomainEvent event, SagaData data) {
-    if (this.recoveryRunning()) {
-      LOG.info("SagaActor recovery {}",event.getEvent());
-    }
+  private void beforeStop(SagaActorState state, SagaData data){
     if (LOG.isDebugEnabled()) {
-      LOG.debug("SagaActor apply event {}", event.getEvent());
+      LOG.debug("stop {} {}", data.getGlobalTxId(), state);
     }
-    // log event to SagaData
-    if (event.getEvent() != null && !(event
-        .getEvent() instanceof ComponsitedCheckEvent)) {
-      data.logEvent(event.getEvent());
+    try{
+      sagaEndTime = System.currentTimeMillis();
+      data.setLastState(state);
+      data.setEndTime(new Date());
+      data.setTerminated(true);
+      SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
+          .stopSagaData(data.getGlobalTxId(), data);
+      SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
+      SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system())
+          .doSagaAvgTime(sagaEndTime - sagaBeginTime);
+
+      // destroy self from cluster shard region
+      getContext().getParent()
+          .tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("destroy saga actor {} from cluster shard region", getSelf());
+      }
+
+      // clear self mailbox from persistence
+      //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
+      //  以下基于 journal-redis 说明:
+      //    假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
+      //    journal:persistenceIds
+      //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
+      //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
+      //
+      //    1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
+      //    2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
+      //       使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
+      //    3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
+      //
+      //    何如清理:
+      //      通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
+      //      遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
+      //      如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
+      //      并删除 journal:persisted:item:highestSequenceNr
+      //
+      //  目前可以看到的解释是 https://github.com/akka/akka/issues/21181
+      deleteMessages(lastSequenceNr());
+      deleteSnapshot(snapshotSequenceNr());
+    }catch(Exception e){
+      LOG.error("stop {} fail",data.getGlobalTxId());
+      throw e;
     }
-    if (event instanceof SagaStartedDomain) {
-      SagaStartedDomain domainEvent = (SagaStartedDomain) event;
-      data.setServiceName(domainEvent.getEvent().getServiceName());
-      data.setInstanceId(domainEvent.getEvent().getInstanceId());
-      data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
-      data.setBeginTime(domainEvent.getEvent().getCreateTime());
-      data.setExpirationTime(domainEvent.getExpirationTime());
-    } else if (event instanceof AddTxEventDomain) {
-      AddTxEventDomain domainEvent = (AddTxEventDomain) event;
-      if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) {
-        TxEntity txEntity = TxEntity.builder()
-            .serviceName(domainEvent.getEvent().getServiceName())
-            .instanceId(domainEvent.getEvent().getInstanceId())
-            .globalTxId(domainEvent.getEvent().getGlobalTxId())
-            .localTxId(domainEvent.getEvent().getLocalTxId())
-            .parentTxId(domainEvent.getEvent().getParentTxId())
-            .compensationMethod(domainEvent.getCompensationMethod())
-            .payloads(domainEvent.getPayloads())
-            .state(domainEvent.getState())
-            .beginTime(domainEvent.getEvent().getCreateTime())
-            .build();
-        data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
-      } else {
-        LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
+  }
+
+  @Override
+  public SagaData applyEvent(DomainEvent event, SagaData data) {
+    try{
+      if (this.recoveryRunning()) {
+        LOG.info("SagaActor recovery {}",event.getEvent());
+      }else if (LOG.isDebugEnabled()) {
+        LOG.debug("SagaActor apply event {}", event.getEvent());
       }
-    } else if (event instanceof UpdateTxEventDomain) {
-      UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
-      TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
-      txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
-      if (domainEvent.getState() == TxState.COMMITTED) {
-        txEntity.setState(domainEvent.getState());
-      } else if (domainEvent.getState() == TxState.FAILED) {
-        txEntity.setState(domainEvent.getState());
-        txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
-        data.getTxEntityMap().forEach((k, v) -> {
-          if (v.getState() == TxState.COMMITTED) {
-            // call compensate
-            compensation(v, data);
-          }
-        });
-      } else if (domainEvent.getState() == TxState.COMPENSATED) {
-        // decrement the compensation running counter by one
-        data.getCompensationRunningCounter().decrementAndGet();
-        txEntity.setState(domainEvent.getState());
-        LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+      // log event to SagaData
+      if (event.getEvent() != null && !(event
+          .getEvent() instanceof ComponsitedCheckEvent)) {
+        data.logEvent(event.getEvent());
       }
-    } else if (event instanceof SagaEndedDomain) {
-      SagaEndedDomain domainEvent = (SagaEndedDomain) event;
-      if (domainEvent.getState() == SagaActorState.FAILED) {
-        data.setTerminated(true);
-        data.getTxEntityMap().forEach((k, v) -> {
-          if (v.getState() == TxState.COMMITTED) {
-            // call compensate
-            compensation(v, data);
-          }
-        });
-      } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
-        data.setEndTime(event.getEvent().getCreateTime());
-        data.setTerminated(true);
-        data.setSuspendedType(domainEvent.getSuspendedType());
-      } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
-        data.setEndTime(event.getEvent().getCreateTime());
-        data.setTerminated(true);
-      } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
-        data.setEndTime(event.getEvent().getCreateTime());
-        data.setTerminated(true);
+      if (event instanceof SagaStartedDomain) {
+        SagaStartedDomain domainEvent = (SagaStartedDomain) event;
+        data.setServiceName(domainEvent.getEvent().getServiceName());
+        data.setInstanceId(domainEvent.getEvent().getInstanceId());
+        data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
+        data.setBeginTime(domainEvent.getEvent().getCreateTime());
+        data.setExpirationTime(domainEvent.getExpirationTime());
+      } else if (event instanceof AddTxEventDomain) {
+        AddTxEventDomain domainEvent = (AddTxEventDomain) event;
+        if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) {
+          TxEntity txEntity = TxEntity.builder()
+              .serviceName(domainEvent.getEvent().getServiceName())
+              .instanceId(domainEvent.getEvent().getInstanceId())
+              .globalTxId(domainEvent.getEvent().getGlobalTxId())
+              .localTxId(domainEvent.getEvent().getLocalTxId())
+              .parentTxId(domainEvent.getEvent().getParentTxId())
+              .compensationMethod(domainEvent.getCompensationMethod())
+              .payloads(domainEvent.getPayloads())
+              .state(domainEvent.getState())
+              .beginTime(domainEvent.getEvent().getCreateTime())
+              .build();
+          data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
+        } else {
+          LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
+        }
+      } else if (event instanceof UpdateTxEventDomain) {
+        UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
+        TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
+        txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
+        if (domainEvent.getState() == TxState.COMMITTED) {
+          txEntity.setState(domainEvent.getState());
+        } else if (domainEvent.getState() == TxState.FAILED) {
+          txEntity.setState(domainEvent.getState());
+          txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
+          data.getTxEntityMap().forEach((k, v) -> {
+            if (v.getState() == TxState.COMMITTED) {
+              // call compensate
+              compensation(v, data);
+            }
+          });
+        } else if (domainEvent.getState() == TxState.COMPENSATED) {
+          // decrement the compensation running counter by one
+          data.getCompensationRunningCounter().decrementAndGet();
+          txEntity.setState(domainEvent.getState());
+          LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+        }
+      } else if (event instanceof SagaEndedDomain) {
+        SagaEndedDomain domainEvent = (SagaEndedDomain) event;
+        if (domainEvent.getState() == SagaActorState.FAILED) {
+          data.setTerminated(true);
+          data.getTxEntityMap().forEach((k, v) -> {
+            if (v.getState() == TxState.COMMITTED) {
+              // call compensate
+              compensation(v, data);
+            }
+          });
+        } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
+          data.setEndTime(event.getEvent().getCreateTime());
+          data.setTerminated(true);
+          data.setSuspendedType(domainEvent.getSuspendedType());
+        } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
+          data.setEndTime(event.getEvent().getCreateTime());
+          data.setTerminated(true);
+        } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
+          data.setEndTime(event.getEvent().getCreateTime());
+          data.setTerminated(true);
+        }
       }
+    }catch (Exception ex){
+      LOG.error("SagaActor apply event {}", event.getEvent());
+      beforeStop(SagaActorState.SUSPENDED, data);
+      stop();
+      //TODO 增加 SagaActor 处理失败指标
     }
     return data;
   }
@@ -531,4 +585,10 @@ public class SagaActor extends
       }
     }
   }
+
+  private void log(BaseEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(event.toString());
+    }
+  }
 }
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 98a58ce..664f692 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -90,7 +90,7 @@ akkaConfig:
         acceptable-heartbeat-pause: 6s
       seed-nodes: ["akka://alpha-cluster@127.0.0.1:8070"]
       sharding:
-        state-store-mode: "persistence"
+        state-store-mode: "ddata" #ddata,persistence
         remember-entities: true
         shard-failure-backoff: 5s
 
@@ -173,21 +173,24 @@ akkaConfig:
         commit-timeout: 15s
         commit-time-warning: 1s
         commit-refresh-interval: infinite
-        use-dispatcher: "akka.kafka.default-dispatcher"
+        use-dispatcher: "akka.kafka.saga-kafka"
         kafka-clients.enable.auto.commit: false
         wait-close-partition: 500ms
-        position-timeout: 5s
-        offset-for-times-timeout: 5s
-        metadata-request-timeout: 5s
+        position-timeout: 10s
+        offset-for-times-timeout: 10s
+        metadata-request-timeout: 10s
         eos-draining-check-interval: 30ms
         partition-handler-warning: 5s
         connection-checker.enable: false
         connection-checker.max-retries: 3
         connection-checker.check-interval: 15s
         connection-checker.backoff-factor: 2.0
-        max-batch: 1000
-        max-interval: 10s
-        parallelism: 1
+      saga-kafka:
+        type: "Dispatcher"
+        executor: "thread-pool-executor"
+        thread-pool-executor:
+          fixed-pool-size: 20
+
 
 akka-persistence-redis:
   redis: