You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:33 UTC

[servicecomb-pack] branch SCB-1321 updated (9b36a38 -> ac55eec)

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a change to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from 9b36a38  SCB-1321 Update document format
     new ed4d692  SCB-1321 Modify SagaActor log level to DEBUG
     new 0f507d5  SCB-1321 Delete unused methods
     new 50698eb  SCB-1321 Delete unused methods
     new a93055e  SCB-1321 Remove Actor replying methods
     new 9eaf13b  SCB-1321 Optimize termination of SagaData cache for stress test
     new f2ac338  SCB-1321 Optimize Benchmark JVM Parameters
     new 07815c2  SCB-1321 Modify RESTful API /saga/events to /saga/events/last
     new e19d72f  SCB-1321 Modify Scenario Name
     new 89f652b  SCB-1321 Remove EventBus between gRPC and Akka
     new 35887b8  SCB-1321 Ignore onDisconnected error
     new fd84324  SCB-1321 Rename RESTful /saga to /saga/akka for acceptance tests
     new ac55eec  SCB-1321 Optimize alpha throughput

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/servicecomb/pack/PackStepdefs.java  |   4 +-
 ...ion_after_post_car_compensated_scenario.feature |   2 +-
 ...n_after_post_hotel_compensated_scenario.feature |   2 +-
 ...pack_booking_timeout_suspended_scenario.feature |   2 +-
 ...ck_booking_timeout_suspended_scenario.feature 2 |  44 -------
 .../pack_car_fail_compensated_scenario.feature     |   2 +-
 .../pack_hotel_fail_compensated_scenario.feature   |   2 +-
 .../pack/alpha/benchmark/Application.java          |   6 +-
 alpha/alpha-fsm/Benchmark.md                       |  12 +-
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |  48 +++++--
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 120 ++++++++++--------
 .../fsm/channel/ActiveMQActorEventChannel.java     |  38 +++---
 .../ActorEventChannel.java}                        |   7 +-
 .../alpha/fsm/channel/KafkaActorEventChannel.java  |  36 +++---
 .../alpha/fsm/channel/MemoryActorEventChannel.java |  70 +++++++++++
 .../alpha/fsm/channel/RedisActorEventChannel.java  |  38 +++---
 .../pack/alpha/fsm/domain/SagaEndedDomain.java     |   5 +
 .../servicecomb/pack/alpha/fsm/model/SagaData.java |   3 +-
 .../DomainEvent.java => sink/ActorEventSink.java}  |   8 +-
 .../SagaActorEventSender.java}                     |  63 ++++------
 .../spring/integration/akka/SagaDataExtension.java |  72 +++++++++--
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 140 ++++++++++-----------
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  65 +++++-----
 .../servicecomb/pack/alpha/server/AlphaConfig.java |   7 +-
 .../pack/alpha/server/AlphaEventController.java    |   1 -
 .../alpha/server/fsm/FsmSagaDataController.java    |   4 +-
 .../alpha/server/fsm/GrpcSagaEventService.java     |  10 +-
 .../src/main/resources/application.yaml            |   6 +
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  33 ++---
 29 files changed, 485 insertions(+), 365 deletions(-)
 delete mode 100644 acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature 2
 copy omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java => alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java (57%)
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{domain/DomainEvent.java => channel/ActorEventChannel.java} (84%)
 copy omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java => alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java (57%)
 create mode 100644 alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
 copy omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/core/ReconnectStreamObserver.java => alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java (57%)
 copy alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{domain/DomainEvent.java => sink/ActorEventSink.java} (84%)
 rename alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/{event/consumer/SagaEventConsumer.java => sink/SagaActorEventSender.java} (51%)


[servicecomb-pack] 10/12: SCB-1321 Ignore onDisconnected error

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 35887b82ce83aa70d2ad61e2be1665ed92484e63
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jul 11 14:35:01 2019 +0800

    SCB-1321 Ignore onDisconnected error
---
 .../org/apache/servicecomb/pack/alpha/benchmark/Application.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/Application.java b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/Application.java
index 0a74910..4b1d7c5 100644
--- a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/Application.java
+++ b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/Application.java
@@ -66,7 +66,11 @@ public class Application implements CommandLineRunner {
         printHelp();
       }
     } finally {
-      sender.onDisconnected();
+      try{
+        sender.onDisconnected();
+      }catch (Throwable e){
+        //
+      }
       System.exit(0);
     }
 


[servicecomb-pack] 04/12: SCB-1321 Remove Actor replying methods

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit a93055e6bc413c4cb66d9ff30be0d5e71634b759
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:47:27 2019 +0800

    SCB-1321 Remove Actor replying methods
---
 .../servicecomb/pack/alpha/fsm/SagaActor.java      | 118 +++++++++--------
 .../pack/alpha/fsm/domain/SagaEndedDomain.java     |   5 +
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 139 ++++++++++-----------
 .../pack/alpha/fsm/SagaIntegrationTest.java        |  24 ++--
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  30 ++---
 5 files changed, 163 insertions(+), 153 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 6881148..a6e3d05 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -98,21 +98,21 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
               SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data);
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
@@ -120,7 +120,7 @@ public class SagaActor extends
         matchEvent(TxEndedEvent.class, SagaData.class,
             (event, data) -> {
               UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
-              if (data.getExpirationTime()  != null) {
+              if (data.getExpirationTime() != null) {
                 return goTo(SagaActorState.PARTIALLY_COMMITTED)
                     .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -132,7 +132,7 @@ public class SagaActor extends
         ).event(TxStartedEvent.class,
             (event, data) -> {
               AddTxEventDomain domainEvent = new AddTxEventDomain(event);
-              if (data.getExpirationTime()  != null) {
+              if (data.getExpirationTime() != null) {
                 return stay()
                     .applying(domainEvent)
                     .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -145,7 +145,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxAbortedEvent.class,
@@ -156,7 +155,8 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              return goTo(SagaActorState.SUSPENDED)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
@@ -189,7 +189,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaEndedEvent.class,
@@ -197,7 +196,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
               return goTo(SagaActorState.COMMITTED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(SagaAbortedEvent.class,
@@ -212,7 +210,8 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              return goTo(SagaActorState.SUSPENDED)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));//.replying(data);
             })
     );
 
@@ -222,7 +221,6 @@ public class SagaActor extends
               SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
               return goTo(SagaActorState.SUSPENDED)
                   .applying(domainEvent)
-                  .replying(data)
                   .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             }
         ).event(TxCompensatedEvent.class, SagaData.class,
@@ -235,12 +233,12 @@ public class SagaActor extends
         ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
             (event, data) -> {
               if (hasCompensationSentTx(data) || !data.isTerminated()) {
-                return stay().replying(data);
+                return stay();
               } else {
-                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
                     .applying(domainEvent)
-                    .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
@@ -249,15 +247,17 @@ public class SagaActor extends
               data.setTerminated(true);
               if (hasCommittedTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
-                return stay().replying(data).applying(domainEvent);
+                return stay()
+                    .applying(domainEvent);
               } else if (hasCompensationSentTx(data)) {
                 SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
-                return stay().replying(data).applying(domainEvent);
+                return stay()
+                    .applying(domainEvent);
               } else {
-                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+                    SagaActorState.COMPENSATED);
                 return goTo(SagaActorState.COMPENSATED)
                     .applying(domainEvent)
-                    .replying(data)
                     .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
               }
             }
@@ -277,21 +277,35 @@ public class SagaActor extends
             }
         ).event(Arrays.asList(StateTimeout()), SagaData.class,
             (event, data) -> {
-              return goTo(SagaActorState.SUSPENDED).replying(data);
+              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+              return goTo(SagaActorState.SUSPENDED)
+                  .applying(domainEvent)
+                  .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
             })
     );
 
     when(SagaActorState.COMMITTED,
         matchAnyEvent(
             (event, data) -> {
-              /**
-               * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
-               * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
-               * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
-               * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
-               * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
-               * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
-               * */
+              //  已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
+              //  以下基于 journal-redis 说明:
+              //    假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
+              //    journal:persistenceIds
+              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
+              //    journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
+              //
+              //    1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
+              //    2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
+              //       使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
+              //    3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
+              //
+              //    何如清理:
+              //      通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
+              //      遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
+              //      如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
+              //      并删除 journal:persisted:item:highestSequenceNr
+              //
+              //  目前可以看到的解释是 https://github.com/akka/akka/issues/21181
               deleteMessages(lastSequenceNr());
               deleteSnapshot(snapshotSequenceNr());
               return stop();
@@ -333,7 +347,7 @@ public class SagaActor extends
             SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
-          if(LOG.isDebugEnabled()){
+          if (LOG.isDebugEnabled()) {
             LOG.debug("transition {} {} -> {}", getSelf(), from, to);
           }
         })
@@ -342,14 +356,14 @@ public class SagaActor extends
     onTermination(
         matchStop(
             Normal(), (state, data) -> {
-              if(LOG.isDebugEnabled()){
-                LOG.info("stop {} {}", data.getGlobalTxId(), state);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("stop {} {}", data.getGlobalTxId(), state);
               }
               data.setTerminated(true);
               data.setLastState(state);
               data.setEndTime(new Date());
               SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
-                  .putSagaData(data.getGlobalTxId(), data);
+                  .stopSagaData(data.getGlobalTxId(), data);
             }
         )
     );
@@ -359,7 +373,8 @@ public class SagaActor extends
   @Override
   public SagaData applyEvent(DomainEvent event, SagaData data) {
     // log event to SagaData
-    if(event.getEvent() != null && !(event.getEvent() instanceof TxComponsitedCheckInternalEvent)){
+    if (event.getEvent() != null && !(event
+        .getEvent() instanceof TxComponsitedCheckInternalEvent)) {
       data.logEvent(event.getEvent());
     }
     if (event instanceof SagaStartedDomain) {
@@ -391,9 +406,6 @@ public class SagaActor extends
       TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
       txEntity.setEndTime(System.currentTimeMillis());
       if (domainEvent.getState() == TxState.COMMITTED) {
-        // stop
-        //data.setEndTime(System.currentTimeMillis());
-        //data.setTerminated(true);
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
         txEntity.setState(domainEvent.getState());
@@ -431,13 +443,17 @@ public class SagaActor extends
         data.setTerminated(true);
       }
     }
-    LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+    }
     return data;
   }
 
   @Override
   public void onRecoveryCompleted() {
-    LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+    }
   }
 
   @Override
@@ -468,37 +484,37 @@ public class SagaActor extends
     // increments the compensation running counter by one
     data.getCompensationRunningCounter().incrementAndGet();
     txEntity.setState(TxState.COMPENSATION_SENT);
-    try{
+    try {
       SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
       LOG.info("compensate {}", txEntity.getLocalTxId());
-    }catch (AlphaException ex){
-      LOG.error(ex.getMessage(),ex);
+    } catch (AlphaException ex) {
+      LOG.error(ex.getMessage(), ex);
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        LOG.error(e.getMessage(),e);
+        LOG.error(e.getMessage(), e);
       }
-      compensation(txEntity,data);
-    }catch (Exception ex){
-      LOG.error("compensation failed "+txEntity.getLocalTxId(), ex);
-      if(txEntity.getRetries() > 0){
+      compensation(txEntity, data);
+    } catch (Exception ex) {
+      LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
+      if (txEntity.getRetries() > 0) {
         // which means the retry number
-        if(txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()){
+        if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()) {
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
           }
-          compensation(txEntity,data);
+          compensation(txEntity, data);
         }
-      } else if(txEntity.getRetries() == -1){
+      } else if (txEntity.getRetries() == -1) {
         // which means retry it until succeed
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
-          LOG.error(e.getMessage(),e);
+          LOG.error(e.getMessage(), e);
         }
-        compensation(txEntity,data);
+        compensation(txEntity, data);
       }
     }
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 1f4b216..f5bc708 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -32,6 +32,11 @@ public class SagaEndedDomain implements DomainEvent {
     this.state = state;
   }
 
+
+  public SagaEndedDomain(SagaActorState state) {
+    this.state = state;
+  }
+
   public SagaActorState getState() {
     return state;
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 926ae84..1b4d84b 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -132,19 +132,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
       system.stop(saga);
     }};
@@ -220,13 +219,6 @@ public class SagaActorTest {
       assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE,
           SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED,
           SagaActorState.COMMITTED);
@@ -234,6 +226,12 @@ public class SagaActorTest {
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), recoveredSaga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       eventListFirst.addAll(eventListSecond);
       assertThat(eventListFirst, is(sagaData.getEvents()));
 
@@ -274,18 +272,17 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 1);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 1);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -337,19 +334,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 2);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 2);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -410,20 +407,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -484,15 +480,13 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
-      sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
       assertEquals(sagaData.getGlobalTxId(), globalTxId);
       assertEquals(sagaData.getTxEntityMap().size(), 3);
       assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -548,20 +542,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -700,19 +693,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -824,19 +816,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -888,19 +879,18 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      sagaData.getTxEntityMap().forEach((k, v) -> {
-        assertEquals(v.getState(), TxState.COMMITTED);
-      });
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      sagaData.getTxEntityMap().forEach((k, v) -> {
+        assertEquals(v.getState(), TxState.COMMITTED);
+      });
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
@@ -951,20 +941,19 @@ public class SagaActorTest {
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
 
-      SagaData sagaData = expectMsgClass(SagaData.class);
-      assertEquals(sagaData.getGlobalTxId(), globalTxId);
-      assertEquals(sagaData.getTxEntityMap().size(), 3);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
-      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
-      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
       transition = expectMsgClass(PersistentFSM.Transition.class);
       assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
 
       Terminated terminated = expectMsgClass(Terminated.class);
       assertEquals(terminated.getActor(), saga);
 
+      SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+      assertEquals(sagaData.getGlobalTxId(), globalTxId);
+      assertEquals(sagaData.getTxEntityMap().size(), 3);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+      assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+      assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
       assertThat(eventList, is(sagaData.getEvents()));
 
       system.stop(saga);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index f408c09..c1a61a6 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -63,7 +63,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -84,7 +84,7 @@ public class SagaIntegrationTest {
 
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -103,7 +103,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -124,7 +124,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -146,7 +146,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -168,7 +168,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -190,7 +190,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -212,7 +212,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -235,7 +235,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -257,7 +257,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -279,7 +279,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
@@ -301,7 +301,7 @@ public class SagaIntegrationTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertNotNull(sagaData.getBeginTime());
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index ecac3b8..d44d728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -99,7 +99,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -121,7 +121,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -143,7 +143,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -167,7 +167,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -193,7 +193,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -225,7 +225,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(5, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -250,7 +250,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(20, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -275,7 +275,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -300,7 +300,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(timeout + 1, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -324,7 +324,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -348,7 +348,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -372,7 +372,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -396,7 +396,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -436,7 +436,7 @@ public class AlphaIntegrationFsmTest {
 
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -462,7 +462,7 @@ public class AlphaIntegrationFsmTest {
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
-      return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+      return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
     });
     SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
     assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);


[servicecomb-pack] 02/12: SCB-1321 Delete unused methods

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0f507d555407f7afbf2ef3ccd57f72683bc91349
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 08:41:09 2019 +0800

    SCB-1321 Delete unused methods
---
 .../pack/alpha/fsm/event/consumer/SagaEventConsumer.java     | 12 ------------
 1 file changed, 12 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index c14fac7..709cc40 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -70,16 +70,4 @@ public class SagaEventConsumer {
       throw ex;
     }
   }
-
-  public Optional<ActorRef> getActorRefFromPath(String path) throws Exception {
-    try {
-      ActorSelection selection = system.actorSelection(path);
-      Future<ActorRef> future = selection.resolveOne(TIMEOUT);
-      ActorRef ref = Await.result(future, TIMEOUT.duration());
-      return Optional.of(ref);
-    } catch (ActorNotFound e) {
-      return Optional.absent();
-    }
-  }
-
 }


[servicecomb-pack] 11/12: SCB-1321 Rename RESTful /saga to /saga/akka for acceptance tests

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit fd843246e42746f6acff073f2af3a121f4af65df
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jul 11 14:38:19 2019 +0800

    SCB-1321 Rename RESTful /saga to /saga/akka for acceptance tests
---
 .../src/test/java/org/apache/servicecomb/pack/PackStepdefs.java       | 4 ++--
 .../servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java      | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
index ce93b6b..9c6a194 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
@@ -102,7 +102,7 @@ public class PackStepdefs implements En {
           map.keySet().retainAll(dataTable.topCells());
       };
 
-      dataMatches(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/events/last", dataTable, columnStrippingConsumer);
+      dataMatches(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/akka/events/last", dataTable, columnStrippingConsumer);
     });
 
     And("^Car Service contains the following booking orders$", (DataTable dataTable) -> {
@@ -127,7 +127,7 @@ public class PackStepdefs implements En {
 
     given()
         .when()
-        .delete(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/events")
+        .delete(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/akka/events")
         .then()
         .statusCode(is(200));
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
index d2257da..c1976a5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
@@ -44,7 +44,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 
 @EnableKamon
 @Controller
-@RequestMapping("/saga")
+@RequestMapping("/saga/akka")
 @Profile("test")
 @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
 // Only export this Controller for test


[servicecomb-pack] 12/12: SCB-1321 Optimize alpha throughput

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit ac55eec256a85f1d6adabb6a8a08e6ffb0c1613b
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jul 11 14:44:19 2019 +0800

    SCB-1321 Optimize alpha throughput
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       | 46 ++++++++++++--
 .../fsm/channel/ActiveMQActorEventChannel.java     | 47 +++++++++++++++
 .../pack/alpha/fsm/channel/ActorEventChannel.java  | 24 ++++++++
 .../alpha/fsm/channel/KafkaActorEventChannel.java  | 43 +++++++++++++
 .../alpha/fsm/channel/MemoryActorEventChannel.java | 70 ++++++++++++++++++++++
 .../alpha/fsm/channel/RedisActorEventChannel.java  | 47 +++++++++++++++
 .../servicecomb/pack/alpha/fsm/model/SagaData.java |  3 +-
 .../pack/alpha/fsm/sink/ActorEventSink.java        | 25 ++++++++
 .../SagaActorEventSender.java}                     | 47 ++++++++-------
 .../spring/integration/akka/SagaDataExtension.java | 33 +++++-----
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  |  1 +
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 37 +++++++-----
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  8 +--
 .../pack/alpha/server/AlphaEventController.java    |  1 -
 .../alpha/server/fsm/GrpcSagaEventService.java     | 10 ++--
 .../src/main/resources/application.yaml            |  6 ++
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  |  3 +-
 17 files changed, 379 insertions(+), 72 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 922e40b..fcf5cec 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -23,9 +23,17 @@ import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -36,6 +44,9 @@ import org.springframework.core.env.ConfigurableEnvironment;
 @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
 public class FsmAutoConfiguration {
 
+  @Value("${alpha.feature.akka.channel.memory.size:-1}")
+  int memoryEventChannelMemorySize;
+
   @Bean
   public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
     ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
@@ -50,13 +61,38 @@ public class FsmAutoConfiguration {
   }
 
   @Bean
-  public SagaEventActorEventSender sagaEventConsumer(){
-    return new SagaEventActorEventSender();
+  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+    return new EventSubscribeBeanPostProcessor();
   }
 
   @Bean
-  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
-    return new EventSubscribeBeanPostProcessor();
+  public ActorEventSink actorEventSink(){
+    return new SagaActorEventSender();
+  }
+
+  @Bean
+  @ConditionalOnMissingBean(ActorEventChannel.class)
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
+  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
+    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
+  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
+    return new ActiveMQActorEventChannel(actorEventSink);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
+    return new KafkaActorEventChannel(actorEventSink);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
+    return new RedisActorEventChannel(actorEventSink);
   }
 
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
new file mode 100644
index 0000000..515f29c
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue
+ * */
+
+public class ActiveMQActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public ActiveMQActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
new file mode 100644
index 0000000..f026d91
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventChannel {
+  void send(BaseEvent event);
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
new file mode 100644
index 0000000..7539069
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public KafkaActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
new file mode 100644
index 0000000..1af2432
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryActorEventChannel implements ActorEventChannel {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+  private final LinkedBlockingQueue<BaseEvent> eventQueue;
+  private int size;
+
+  public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+    this.size = size > 0 ? size : Integer.MAX_VALUE;
+    eventQueue = new LinkedBlockingQueue(this.size);
+    this.actorEventSink = actorEventSink;
+    new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      eventQueue.put(event);
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  class EventConsumer implements Runnable {
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          BaseEvent event = eventQueue.peek();
+          if (event != null) {
+            actorEventSink.send(event);
+            eventQueue.poll();
+          } else {
+            Thread.sleep(10);
+          }
+        } catch (Exception ex) {
+          LOG.error(ex.getMessage(), ex);
+        }
+      }
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
new file mode 100644
index 0000000..f055eec
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pub/Sub
+ * */
+
+public class RedisActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public RedisActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 59c1e4e..e9a5f60 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
@@ -38,7 +39,7 @@ public class SagaData implements Serializable {
   private boolean terminated;
   private SagaActorState lastState;
   private AtomicLong compensationRunningCounter = new AtomicLong();
-  private Map<String,TxEntity> txEntityMap = new HashMap<>();
+  private Map<String,TxEntity> txEntityMap = new ConcurrentHashMap<>();
   private List<BaseEvent> events = new LinkedList<>();
 
   public String getServiceName() {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
new file mode 100644
index 0000000..73ba220
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventSink {
+
+  void send(BaseEvent event) throws Exception;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
similarity index 52%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
index 84d7914..cdc0828 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -15,47 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
+package org.apache.servicecomb.pack.alpha.fsm.sink;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.util.Timeout;
 import java.lang.invoke.MethodHandles;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class SagaActorEventSender implements ActorEventSink {
 
-@Component
-public class SagaEventActorEventSender {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Autowired
   ActorSystem system;
 
-  private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
+  private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
 
   public void send(BaseEvent event) {
-    if(LOG.isDebugEnabled()){
-      LOG.debug("send {} ", event.toString());
-    }
     try{
-      ActorRef saga;
-      if(sagaCache.containsKey(event.getGlobalTxId())){
-        saga = sagaCache.get(event.getGlobalTxId());
-      }else{
-        saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
-        sagaCache.put(event.getGlobalTxId(), saga);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send {} ", event.toString());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send {} ", event.toString());
       }
-      saga.tell(event, ActorRef.noSender());
-      if(LOG.isDebugEnabled()){
-        LOG.debug("tell {} to {}", event.toString(),saga);
+      if (event instanceof SagaStartedEvent) {
+        final ActorRef saga = system
+            .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+        saga.tell(event, ActorRef.noSender());
+      } else {
+        ActorSelection actorSelection = system
+            .actorSelection("/user/" + event.getGlobalTxId());
+        final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout);
+        final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
+        saga.tell(event, ActorRef.noSender());
       }
     }catch (Exception ex){
-      throw ex;
+      throw new RuntimeException(ex);
     }
   }
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index c1690c6..ae8d43d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
+  public static boolean autoCleanSagaDataMap = true; // Only for Test
 
   @Override
   public SagaDataExt createExtension(ExtendedActorSystem system) {
@@ -38,22 +39,24 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   }
 
   public static class SagaDataExt implements Extension {
-    private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+    //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
     private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
     private String lastGlobalTxId;
-    private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+    private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
 
     public SagaDataExt() {
       // Just to avoid the overflow of the OldGen for stress testing
       // Delete after SagaData persistence
-      new Thread(cleanMemForTest).start();
+      if(autoCleanSagaDataMap){
+        new Thread(cleanMemForTest).start();
+      }
     }
 
     public void putSagaData(String globalTxId, SagaData sagaData) {
-      if(!globalTxIds.contains(globalTxId)){
+      //if(!globalTxIds.contains(globalTxId)){
         lastGlobalTxId = globalTxId;
-        globalTxIds.add(globalTxId);
-      }
+      //  globalTxIds.add(globalTxId);
+      //}
       sagaDataMap.put(globalTxId, sagaData);
     }
 
@@ -71,7 +74,8 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
     // Only test
     public void clearSagaData() {
-      globalTxIds.clear();
+      //globalTxIds.clear();
+      lastGlobalTxId = null;
       sagaDataMap.clear();
     }
 
@@ -81,11 +85,9 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   }
 
   static class CleanMemForTest implements Runnable {
-    final ConcurrentLinkedQueue<String> globalTxIds;
     final ConcurrentHashMap<String, SagaData> sagaDataMap;
 
-    public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
-      this.globalTxIds = globalTxIds;
+    public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
       this.sagaDataMap = sagaDataMap;
     }
 
@@ -93,19 +95,12 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
     public void run() {
       while (true){
         try{
-          if(!globalTxIds.isEmpty()){
-            int cache_size = globalTxIds.size()-5000;
-            while(cache_size>0){
-              sagaDataMap.remove(globalTxIds.poll());
-              cache_size--;
-            }
-          }
+          sagaDataMap.clear();
         }catch (Exception e){
           LOG.error(e.getMessage(),e);
         }finally {
-          LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
           try {
-            Thread.sleep(60000);
+            Thread.sleep(10000);
           } catch (InterruptedException e) {
             LOG.error(e.getMessage(),e);
           }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 1b4d84b..505d923 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -67,6 +67,7 @@ public class SagaActorTest {
 
   @BeforeClass
   public static void setup() {
+    SagaDataExtension.autoCleanSagaDataMap=false;
     system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
   }
 
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 82ae48a..69d2870 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -23,15 +23,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
 import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
@@ -39,6 +38,7 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
         "alpha.feature.akka.enabled=true",
+        "alpha.feature.akka.channel.type=memory",
         "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
         "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
         "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
@@ -50,7 +50,12 @@ public class SagaIntegrationTest {
   ActorSystem system;
   
   @Autowired
-  SagaEventActorEventSender sagaEventActorEventSender;
+  SagaActorEventSender sagaActorEventSender;
+
+  @BeforeClass
+  public static void setup(){
+    SagaDataExtension.autoCleanSagaDataMap=false;
+  }
 
   @Test
   public void successfulTest() {
@@ -59,7 +64,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +84,7 @@ public class SagaIntegrationTest {
     final String globalTxId = UUID.randomUUID().toString();
     final String localTxId_1 = UUID.randomUUID().toString();
     SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
 
     await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +104,7 @@ public class SagaIntegrationTest {
     final String localTxId_1 = UUID.randomUUID().toString();
     final String localTxId_2 = UUID.randomUUID().toString();
     SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +125,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +147,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +169,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +191,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +213,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +236,7 @@ public class SagaIntegrationTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final int timeout = 5; // second
     SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +258,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +280,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +302,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 17589e9..d2e94b1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,7 +29,7 @@ import javax.annotation.PreDestroy;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -168,11 +168,11 @@ public class AlphaConfig {
 
   @Bean
   @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
-  ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+  ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
-      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
     ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
-        new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
+        new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService);
     new Thread(bootstrap::start).start();
     tccPendingTaskRunner.start();
     tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
index 2e6b8e0..b7a344a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
@@ -43,7 +43,6 @@ import kamon.annotation.Trace;
 @Controller
 @RequestMapping("/saga")
 @Profile("test")
-@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true)
 // Only export this Controller for test
 class AlphaEventController {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 3cfb931..dcf5cf3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import kamon.annotation.Trace;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-  private final SagaEventActorEventSender sagaEventActorEventSender;
+  private final ActorEventChannel actorEventChannel;
 
-  public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
+  public GrpcSagaEventService(ActorEventChannel actorEventChannel,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    this.sagaEventActorEventSender = sagaEventActorEventSender;
+    this.actorEventChannel = actorEventChannel;
     this.omegaCallbacks = omegaCallbacks;
   }
 
@@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
     }
     if (event != null) {
       event.setCreateTime(new Date());
-      sagaEventActorEventSender.send(event);
+      actorEventChannel.send(event);
     }
     responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 1aa7a68..fa1b35a 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,11 @@ alpha:
   server:
     host: 0.0.0.0
     port: 8080
+  feature:
+    akka:
+      enabled: false
+      channel:
+        type: memory
 
 spring:
   datasource:
@@ -45,6 +50,7 @@ eureka:
     metadataMap:
       servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
 
+
 akkaConfig:
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
   akka.persistence.journal.leveldb.dir: target/example/journal
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d44d728..d0902e5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -68,7 +68,8 @@ public class AlphaIntegrationFsmTest {
 
   @BeforeClass
   public static void beforeClass() {
-    omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build());
+    omegaEventSender.configClient(NettyChannelBuilder.forAddress("0.0.0.0", port).usePlaintext().build());
+    SagaDataExtension.autoCleanSagaDataMap=false;
   }
 
   @AfterClass


[servicecomb-pack] 07/12: SCB-1321 Modify RESTful API /saga/events to /saga/events/last

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 07815c224fe25568658aa72657417d1103ee668e
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 20:05:40 2019 +0800

    SCB-1321 Modify RESTful API /saga/events to /saga/events/last
---
 .../src/test/java/org/apache/servicecomb/pack/PackStepdefs.java        | 2 +-
 .../pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java      | 3 ++-
 .../servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java       | 2 +-
 3 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
index 711edd2..ce93b6b 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/java/org/apache/servicecomb/pack/PackStepdefs.java
@@ -102,7 +102,7 @@ public class PackStepdefs implements En {
           map.keySet().retainAll(dataTable.topCells());
       };
 
-      dataMatches(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/events", dataTable, columnStrippingConsumer);
+      dataMatches(System.getProperty(ALPHA_REST_ADDRESS) + "/saga/events/last", dataTable, columnStrippingConsumer);
     });
 
     And("^Car Service contains the following booking orders$", (DataTable dataTable) -> {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 1a49527..c1690c6 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -51,6 +51,7 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
     public void putSagaData(String globalTxId, SagaData sagaData) {
       if(!globalTxIds.contains(globalTxId)){
+        lastGlobalTxId = globalTxId;
         globalTxIds.add(globalTxId);
       }
       sagaDataMap.put(globalTxId, sagaData);
@@ -75,7 +76,7 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
     }
 
     public SagaData getLastSagaData() {
-      return sagaDataMap.get(lastGlobalTxId);
+      return getSagaData(lastGlobalTxId);
     }
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
index 7790286..d2257da 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/FsmSagaDataController.java
@@ -55,7 +55,7 @@ class FsmSagaDataController {
   ActorSystem system;
 
   @Trace("getEvents")
-  @GetMapping(value = "/events")
+  @GetMapping(value = "/events/last")
   ResponseEntity<Collection<Map>> events() {
     LOG.info("Get the events request");
     List<Map> eventVos = new LinkedList<>();


[servicecomb-pack] 01/12: SCB-1321 Modify SagaActor log level to DEBUG

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit ed4d6921516171caa963c73f17fa590777c6045f
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 08:40:10 2019 +0800

    SCB-1321 Modify SagaActor log level to DEBUG
---
 .../java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 188f156..6881148 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -333,14 +333,18 @@ public class SagaActor extends
             SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                 .putSagaData(stateData().getGlobalTxId(), stateData());
           }
-          LOG.info("transition {} {} -> {}", getSelf(), from, to);
+          if(LOG.isDebugEnabled()){
+            LOG.debug("transition {} {} -> {}", getSelf(), from, to);
+          }
         })
     );
 
     onTermination(
         matchStop(
             Normal(), (state, data) -> {
-              LOG.info("stop {} {}", data.getGlobalTxId(), state);
+              if(LOG.isDebugEnabled()){
+                LOG.info("stop {} {}", data.getGlobalTxId(), state);
+              }
               data.setTerminated(true);
               data.setLastState(state);
               data.setEndTime(new Date());


[servicecomb-pack] 03/12: SCB-1321 Delete unused methods

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 50698eb2931938b338515c33604787319c9af50d
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:44:56 2019 +0800

    SCB-1321 Delete unused methods
---
 .../pack/alpha/fsm/event/consumer/SagaEventConsumer.java      | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index 709cc40..758880b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -17,30 +17,21 @@
 
 package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
 
-import akka.actor.ActorNotFound;
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
 import com.google.common.eventbus.Subscribe;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 
 public class SagaEventConsumer {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final Timeout TIMEOUT = new Timeout(5, TimeUnit.SECONDS);
-
   @Autowired
   ActorSystem system;
 
@@ -50,7 +41,7 @@ public class SagaEventConsumer {
    * Receive fsm message
    * */
   @Subscribe
-  public void receiveSagaEvent(BaseEvent event) throws Exception {
+  public void receiveSagaEvent(BaseEvent event) {
     if(LOG.isDebugEnabled()){
       LOG.debug("receive {} ", event.toString());
     }


[servicecomb-pack] 05/12: SCB-1321 Optimize termination of SagaData cache for stress test

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 9eaf13b3e7e7444ad18f28198941df905c177e4a
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:49:18 2019 +0800

    SCB-1321 Optimize termination of SagaData cache for stress test
---
 .../spring/integration/akka/SagaDataExtension.java | 76 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 8 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index f0d8f58..1a49527 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -20,12 +20,16 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
 import akka.actor.AbstractExtensionId;
 import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
-
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
 
   @Override
@@ -34,22 +38,78 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   }
 
   public static class SagaDataExt implements Extension {
-    private ConcurrentSkipListMap<String, SagaData> sagaDataMap = new ConcurrentSkipListMap();
+    private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+    private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+    private String lastGlobalTxId;
+    private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+
+    public SagaDataExt() {
+      // Just to avoid the overflow of the OldGen for stress testing
+      // Delete after SagaData persistence
+      new Thread(cleanMemForTest).start();
+    }
 
-    public void putSagaData(String globalTxId, SagaData sagaData){
+    public void putSagaData(String globalTxId, SagaData sagaData) {
+      if(!globalTxIds.contains(globalTxId)){
+        globalTxIds.add(globalTxId);
+      }
       sagaDataMap.put(globalTxId, sagaData);
     }
 
-    public SagaData getSagaData(String globalTxId){
+    public void stopSagaData(String globalTxId, SagaData sagaData) {
+      // TODO save SagaDate to database and clean sagaDataMap
+      this.putSagaData(globalTxId, sagaData);
+      lastGlobalTxId = globalTxId;
+    }
+
+    public SagaData getSagaData(String globalTxId) {
+      // TODO If globalTxId does not exist in sagaDataMap then
+      //  load from the database
       return sagaDataMap.get(globalTxId);
     }
 
-    public void clearSagaData(){
+    // Only test
+    public void clearSagaData() {
+      globalTxIds.clear();
       sagaDataMap.clear();
     }
 
-    public SagaData getLastSagaData(){
-      return sagaDataMap.lastEntry().getValue();
+    public SagaData getLastSagaData() {
+      return sagaDataMap.get(lastGlobalTxId);
+    }
+  }
+
+  static class CleanMemForTest implements Runnable {
+    final ConcurrentLinkedQueue<String> globalTxIds;
+    final ConcurrentHashMap<String, SagaData> sagaDataMap;
+
+    public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
+      this.globalTxIds = globalTxIds;
+      this.sagaDataMap = sagaDataMap;
+    }
+
+    @Override
+    public void run() {
+      while (true){
+        try{
+          if(!globalTxIds.isEmpty()){
+            int cache_size = globalTxIds.size()-5000;
+            while(cache_size>0){
+              sagaDataMap.remove(globalTxIds.poll());
+              cache_size--;
+            }
+          }
+        }catch (Exception e){
+          LOG.error(e.getMessage(),e);
+        }finally {
+          LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
+          try {
+            Thread.sleep(60000);
+          } catch (InterruptedException e) {
+            LOG.error(e.getMessage(),e);
+          }
+        }
+      }
     }
   }
 }


[servicecomb-pack] 09/12: SCB-1321 Remove EventBus between gRPC and Akka

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 89f652b030f39dec13d32af6d375565e16c3c5ee
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 23:45:10 2019 +0800

    SCB-1321 Remove EventBus between gRPC and Akka
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       | 12 ++------
 ...onsumer.java => SagaEventActorEventSender.java} | 17 +++++-------
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 32 +++++++++++-----------
 .../servicecomb/pack/alpha/server/AlphaConfig.java |  5 ++--
 .../alpha/server/fsm/GrpcSagaEventService.java     | 10 +++----
 5 files changed, 34 insertions(+), 42 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 5371159..922e40b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -20,11 +20,10 @@ package org.apache.servicecomb.pack.alpha.fsm;
 import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
 
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -50,14 +49,9 @@ public class FsmAutoConfiguration {
     return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
   }
 
-  @Bean(name = "sagaEventBus")
-  public EventBus sagaEventBus() {
-    return new EventBus();
-  }
-
   @Bean
-  public SagaEventConsumer sagaEventConsumer(){
-    return new SagaEventConsumer();
+  public SagaEventActorEventSender sagaEventConsumer(){
+    return new SagaEventActorEventSender();
   }
 
   @Bean
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
similarity index 85%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
index 758880b..84d7914 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
@@ -19,31 +19,28 @@ package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.Subscribe;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
-public class SagaEventConsumer {
+@Component
+public class SagaEventActorEventSender {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Autowired
   ActorSystem system;
 
-  private Map<String,ActorRef> sagaCache = new HashMap<>();
+  private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
 
-  /**
-   * Receive fsm message
-   * */
-  @Subscribe
-  public void receiveSagaEvent(BaseEvent event) {
+  public void send(BaseEvent event) {
     if(LOG.isDebugEnabled()){
-      LOG.debug("receive {} ", event.toString());
+      LOG.debug("send {} ", event.toString());
     }
     try{
       ActorRef saga;
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index c1a61a6..82ae48a 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorSystem;
 import com.google.common.eventbus.EventBus;
 import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.Test;
@@ -46,11 +47,10 @@ import org.springframework.test.context.junit4.SpringRunner;
 public class SagaIntegrationTest {
 
   @Autowired
-  @Qualifier("sagaEventBus")
-  EventBus sagaEventBus;
-
-  @Autowired
   ActorSystem system;
+  
+  @Autowired
+  SagaEventActorEventSender sagaEventActorEventSender;
 
   @Test
   public void successfulTest() {
@@ -59,7 +59,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +79,7 @@ public class SagaIntegrationTest {
     final String globalTxId = UUID.randomUUID().toString();
     final String localTxId_1 = UUID.randomUUID().toString();
     SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
 
     await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +99,7 @@ public class SagaIntegrationTest {
     final String localTxId_1 = UUID.randomUUID().toString();
     final String localTxId_2 = UUID.randomUUID().toString();
     SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +120,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +142,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +164,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +186,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +208,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +231,7 @@ public class SagaIntegrationTest {
     final String localTxId_3 = UUID.randomUUID().toString();
     final int timeout = 5; // second
     SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +253,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +275,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +297,7 @@ public class SagaIntegrationTest {
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index e6fa86a..17589e9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,6 +29,7 @@ import javax.annotation.PreDestroy;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -169,9 +170,9 @@ public class AlphaConfig {
   @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
   ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
-      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, @Qualifier("sagaEventBus") EventBus sagaEventBus) throws IOException {
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
     ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
-        new GrpcSagaEventService(sagaEventBus, omegaCallbacks), grpcTccEventService);
+        new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
     new Thread(bootstrap::start).start();
     tccPendingTaskRunner.start();
     tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 99b26bf..3cfb931 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -19,7 +19,6 @@ package org.apache.servicecomb.pack.alpha.server.fsm;
 
 import static java.util.Collections.emptyMap;
 
-import com.google.common.eventbus.EventBus;
 import io.grpc.stub.StreamObserver;
 import java.lang.invoke.MethodHandles;
 import java.util.Date;
@@ -28,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import kamon.annotation.Trace;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-  private final EventBus sagaEventBus;
+  private final SagaEventActorEventSender sagaEventActorEventSender;
 
-  public GrpcSagaEventService(EventBus sagaEventBus,
+  public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    this.sagaEventBus = sagaEventBus;
+    this.sagaEventActorEventSender = sagaEventActorEventSender;
     this.omegaCallbacks = omegaCallbacks;
   }
 
@@ -142,7 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
     }
     if (event != null) {
       event.setCreateTime(new Date());
-      this.sagaEventBus.post(event);
+      sagaEventActorEventSender.send(event);
     }
     responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();


[servicecomb-pack] 06/12: SCB-1321 Optimize Benchmark JVM Parameters

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f2ac338c953a71602676f5e64c7dd2c6a9cf9707
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 19:00:25 2019 +0800

    SCB-1321 Optimize Benchmark JVM Parameters
---
 alpha/alpha-fsm/Benchmark.md | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm/Benchmark.md b/alpha/alpha-fsm/Benchmark.md
index 9f24686..e6df3ff 100644
--- a/alpha/alpha-fsm/Benchmark.md
+++ b/alpha/alpha-fsm/Benchmark.md
@@ -52,7 +52,17 @@
 ### Alpha配置
 
 ```bash
-java -Xmx3g -Xms3g -Xss256k \
+java \
+  -Xmx8g -Xms8g -Xmn4g \
+  -Xss256k \
+  -XX:PermSize=128m -XX:MaxPermSize=512m \
+  -XX:+UseConcMarkSweepGC \
+  -XX:+UseParNewGC \
+  -XX:MaxTenuringThreshold=15 \
+  -XX:+ExplicitGCInvokesConcurrent \
+  -XX:+CMSParallelRemarkEnabled \
+  -XX:SurvivorRatio=8 \
+  -XX:+UseCompressedOops \
   -Dcom.sun.management.jmxremote \
   -Dcom.sun.management.jmxremote.port=9090 \
   -Dcom.sun.management.jmxremote.ssl=false \


[servicecomb-pack] 08/12: SCB-1321 Modify Scenario Name

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit e19d72f08de4d2548e26d26cb0ade9f6a4fe9f4e
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 20:07:29 2019 +0800

    SCB-1321 Modify Scenario Name
---
 ...ion_after_post_car_compensated_scenario.feature |  2 +-
 ...n_after_post_hotel_compensated_scenario.feature |  2 +-
 ...pack_booking_timeout_suspended_scenario.feature |  2 +-
 ...ck_booking_timeout_suspended_scenario.feature 2 | 44 ----------------------
 .../pack_car_fail_compensated_scenario.feature     |  2 +-
 .../pack_hotel_fail_compensated_scenario.feature   |  2 +-
 6 files changed, 5 insertions(+), 49 deletions(-)

diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_car_compensated_scenario.feature b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_car_compensated_scenario.feature
index 1aae5ee..dd441dd 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_car_compensated_scenario.feature
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_car_compensated_scenario.feature
@@ -15,7 +15,7 @@
 
 Feature: Alpha records transaction events
 
-  Scenario: Booking Exception After Post Hotel Secenario
+  Scenario: Booking Exception After Car Hotel Secenario
     Given Car Service is up and running
     And Hotel Service is up and running
     And Booking Service is up and running
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_hotel_compensated_scenario.feature b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_hotel_compensated_scenario.feature
index aa7f155..15aa8eb 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_hotel_compensated_scenario.feature
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_exception_after_post_hotel_compensated_scenario.feature
@@ -15,7 +15,7 @@
 
 Feature: Alpha records transaction events
 
-  Scenario: Booking Exception After Post Car Secenario
+  Scenario: Booking Exception After Post Hotel Secenario
     Given Car Service is up and running
     And Hotel Service is up and running
     And Booking Service is up and running
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature
index 1cfce0e..8b4a707 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature
@@ -15,7 +15,7 @@
 
 Feature: Alpha records transaction events
 
-  Scenario: A transaction timeout and will be compensated
+  Scenario: Booking transaction timeout and will be suspended
     Given Car Service is up and running
     And Hotel Service is up and running
     And Booking Service is up and running
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature 2 b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature 2
deleted file mode 100644
index 1cfce0e..0000000
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_booking_timeout_suspended_scenario.feature 2	
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-Feature: Alpha records transaction events
-
-  Scenario: A transaction timeout and will be compensated
-    Given Car Service is up and running
-    And Hotel Service is up and running
-    And Booking Service is up and running
-    And Alpha is up and running
-
-    Given Install the byteman script booking_timeout.btm to Booking Service
-
-    When User Sean requests to book 1 cars and 1 rooms fail
-
-    Then Alpha records the following events
-      | serviceName  | type          |
-      | booking | SagaStartedEvent   |
-      | car     | TxStartedEvent     |
-      | car     | TxEndedEvent       |
-      | hotel   | TxStartedEvent     |
-      | hotel   | TxEndedEvent       |
-
-
-    Then Car Service contains the following booking orders
-      | id | name | amount | confirmed | cancelled |
-      | 1  | Sean | 1      | true     | false      |
-
-    Then Hotel Service contains the following booking orders
-      | id | name | amount | confirmed | cancelled |
-      | 1  | Sean | 1      | true     | false      |
-
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_car_fail_compensated_scenario.feature b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_car_fail_compensated_scenario.feature
index 01a073e..3abf2d0 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_car_fail_compensated_scenario.feature
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_car_fail_compensated_scenario.feature
@@ -15,7 +15,7 @@
 
 Feature: Alpha records transaction events
 
-  Scenario: A sub-transaction failed and global transaction compensated
+  Scenario: Car sub-transaction failed and global transaction compensated
     Given Car Service is up and running
     And Hotel Service is up and running
     And Booking Service is up and running
diff --git a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_hotel_fail_compensated_scenario.feature b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_hotel_fail_compensated_scenario.feature
index f8f0e64..0e11ae9 100644
--- a/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_hotel_fail_compensated_scenario.feature
+++ b/acceptance-tests/acceptance-pack-akka-spring-demo/src/test/resources/pack_hotel_fail_compensated_scenario.feature
@@ -15,7 +15,7 @@
 
 Feature: Alpha records transaction events
 
-  Scenario: A sub-transaction failed and global transaction compensated
+  Scenario: Hotel sub-transaction failed and global transaction compensated
     Given Car Service is up and running
     And Hotel Service is up and running
     And Booking Service is up and running