You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/05 14:47:14 UTC

[servicecomb-pack] 03/38: SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 9d5a712edad07f41d61cf6430222c44d1caa9dfe
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Thu Jul 18 14:36:34 2019 +0800

    SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
---
 .../pack/alpha/fsm/FsmAutoConfiguration.java       | 85 ++++++++++++++++++----
 .../servicecomb/pack/alpha/fsm/SagaActor.java      |  2 +-
 .../pack/alpha/fsm/event/base/BaseEvent.java       |  4 +
 .../pack/alpha/fsm/metrics/MetricsBean.java        | 46 ++++++++++++
 .../servicecomb/pack/alpha/fsm/model/TxEntity.java | 17 +++--
 .../spring/integration/akka/SagaDataExtension.java | 43 ++++++++---
 .../servicecomb/pack/alpha/fsm/SagaActorTest.java  | 22 ++++++
 .../pack/alpha/fsm/SagaIntegrationTest.java        | 14 +++-
 .../alpha/server/fsm/AlphaIntegrationFsmTest.java  | 14 +++-
 9 files changed, 213 insertions(+), 34 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 9101686..28e517b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -24,13 +24,21 @@ import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
+<<<<<<< HEAD
 
 import org.apache.servicecomb.pack.alpha.core.NodeStatus;
+=======
+import javax.annotation.PostConstruct;
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
 import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.channel.redis.MessagePublisher;
 import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
 import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessageSubscriber;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
 import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
 import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
@@ -50,6 +58,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.core.env.ConfigurableEnvironment;
+<<<<<<< HEAD
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -59,64 +68,92 @@ import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 import org.springframework.data.redis.serializer.GenericToStringSerializer;
 import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
+=======
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
 
 @Configuration
 @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
 public class FsmAutoConfiguration {
 
-  @Value("${alpha.feature.akka.channel.memory.size:-1}")
+  @Value("${alpha.feature.akka.elasticsearch.memory.size:-1}")
   int memoryEventChannelMemorySize;
 
+  @Value("${alpha.feature.akka.transaction.repository.elasticsearch.batchSize:100}")
+  int repositoryElasticsearchBatchSize;
+
+  @Value("${alpha.feature.akka.transaction.repository.elasticsearch.refreshTime:5000}")
+  int repositoryElasticsearchRefreshTime;
+
+  @Value("${alpha.feature.akka.transaction.repository.elasticsearch.memory.size:-1}")
+  int memoryTransactionRepositoryChannelSize;
+
+  @PostConstruct
+  void init() {
+    System.setProperty("es.set.netty.runtime.available.processors", "false");
+  }
+
   @Bean
-  public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
-    ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
+  public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext,
+      ConfigurableEnvironment environment, MetricsService metricsService,
+      TransactionRepositoryChannel repositoryChannel) {
+    ActorSystem system = ActorSystem
+        .create("alpha-akka", akkaConfiguration(applicationContext, environment));
     SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
-    SAGA_DATA_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
+    SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
+    SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
     return system;
   }
 
   @Bean
-  public Config akkaConfiguration(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
+  public Config akkaConfiguration(ConfigurableApplicationContext applicationContext,
+      ConfigurableEnvironment environment) {
     final Map<String, Object> converted = AkkaConfigPropertyAdapter.getPropertyMap(environment);
-    return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
+    return ConfigFactory.parseMap(converted)
+        .withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
   }
 
   @Bean
-  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor() {
     return new EventSubscribeBeanPostProcessor();
   }
 
   @Bean
-  public MetricsService metricsService(){
+  public MetricsService metricsService() {
     return new MetricsService();
   }
 
   @Bean
-  public ActorEventSink actorEventSink(MetricsService metricsService){
+  public ActorEventSink actorEventSink(MetricsService metricsService) {
     return new SagaActorEventSender(metricsService);
   }
 
   @Bean
   @ConditionalOnMissingBean(ActorEventChannel.class)
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
-  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
-    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,metricsService);
+  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink,
+      MetricsService metricsService) {
+    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,
+        metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
-  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink,
+      MetricsService metricsService) {
     return new ActiveMQActorEventChannel(actorEventSink, metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
-  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink,
+      MetricsService metricsService) {
     return new KafkaActorEventChannel(actorEventSink, metricsService);
   }
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+<<<<<<< HEAD
   public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService, @Lazy RedisMessagePublisher redisMessagePublisher){
     return new RedisActorEventChannel(actorEventSink, metricsService, redisMessagePublisher);
   }
@@ -174,6 +211,28 @@ public class FsmAutoConfiguration {
       }
       return new ChannelTopic(topic);
     }
+=======
+  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink,
+      MetricsService metricsService) {
+    return new RedisActorEventChannel(actorEventSink, metricsService);
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.transcation.repository.type", havingValue = "elasticsearch", matchIfMissing = true)
+  public TransactionRepository transcationRepository(MetricsService metricsService,
+      ElasticsearchTemplate template) {
+    return new ElasticsearchTransactionRepository(template, metricsService,
+        repositoryElasticsearchBatchSize, repositoryElasticsearchRefreshTime);
+  }
+
+  @Bean
+  @ConditionalOnMissingBean(TransactionRepositoryChannel.class)
+  @ConditionalOnProperty(value = "alpha.feature.akka.transcation.repository.channel.type", havingValue = "memory", matchIfMissing = true)
+  TransactionRepositoryChannel memoryTransactionRepositoryChannel(TransactionRepository repository,
+      MetricsService metricsService) {
+    return new MemoryTransactionRepositoryChannel(repository, memoryTransactionRepositoryChannelSize,
+        metricsService);
   }
 
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index f173a6f..b998dcf 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -406,7 +406,7 @@ public class SagaActor extends
     } else if (event instanceof UpdateTxEventDomain) {
       UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
       TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
-      txEntity.setEndTime(System.currentTimeMillis());
+      txEntity.setEndTime(new Date());
       if (domainEvent.getState() == TxState.COMMITTED) {
         txEntity.setState(domainEvent.getState());
       } else if (domainEvent.getState() == TxState.FAILED) {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
index f3ad9df..ef27594 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -33,6 +33,10 @@ public abstract class BaseEvent implements Serializable {
 
   }
 
+  public String getType() {
+    return this.getClass().getSimpleName();
+  }
+
   public String getServiceName() {
     return serviceName;
   }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
index b1b8ad3..55e8af2 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
@@ -36,6 +36,10 @@ public class MetricsBean {
   private AtomicLong committed = new AtomicLong();
   private AtomicLong compensated = new AtomicLong();
   private AtomicLong suspended = new AtomicLong();
+  private AtomicLong repositoryReceived = new AtomicLong();
+  private AtomicLong repositoryAccepted = new AtomicLong();
+  private AtomicLong repositoryRejected = new AtomicLong();
+  private AtomicDouble repositoryAvgTime = new AtomicDouble();//milliseconds moving average
 
   public void doEventReceived() {
     eventReceived.incrementAndGet();
@@ -107,6 +111,31 @@ public class MetricsBean {
     suspended.incrementAndGet();
   }
 
+  public void doRepositoryReceived() {
+    repositoryReceived.incrementAndGet();
+  }
+
+  public void doRepositoryAccepted() {
+    repositoryAccepted.incrementAndGet();
+  }
+
+  public void doRepositoryAccepted(int size) {
+    repositoryAccepted.addAndGet(size);
+  }
+
+  public void doRepositoryRejected() {
+    repositoryReceived.decrementAndGet();
+    repositoryRejected.incrementAndGet();
+  }
+
+  public void doRepositoryAvgTime(long time) {
+    if (repositoryAvgTime.get() == 0) {
+      repositoryAvgTime.set(time);
+    } else {
+      repositoryAvgTime.set((repositoryAvgTime.get() + time) / 2);
+    }
+  }
+
   public long getEventReceived() {
     return eventReceived.get();
   }
@@ -151,6 +180,22 @@ public class MetricsBean {
     return (double) Math.round(sagaAvgTime.get() * 100) / 100;
   }
 
+  public long getRepositoryReceived() {
+    return repositoryReceived.get();
+  }
+
+  public long getRepositoryAccepted() {
+    return repositoryAccepted.get();
+  }
+
+  public AtomicLong getRepositoryRejected() {
+    return repositoryRejected;
+  }
+
+  public double getRepositoryAvgTime() {
+    return (double) Math.round(repositoryAvgTime.get() * 100) / 100;
+  }
+
   public long getCommitted() {
     return committed.get();
   }
@@ -162,4 +207,5 @@ public class MetricsBean {
   public long getSuspended() {
     return suspended.get();
   }
+
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index ce0fb5b..9928283 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.pack.alpha.fsm.model;
 
 import java.io.Serializable;
+import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.servicecomb.pack.alpha.fsm.TxState;
 
@@ -25,8 +26,8 @@ public class TxEntity implements Serializable {
   private String serviceName;
   private String instanceId;
   private String globalTxId;
-  private long beginTime = System.currentTimeMillis();
-  private long endTime;
+  private Date beginTime = new Date();
+  private Date endTime;
   private String parentTxId;
   private String localTxId;
   private TxState state;
@@ -52,19 +53,19 @@ public class TxEntity implements Serializable {
     this.instanceId = instanceId;
   }
 
-  public long getBeginTime() {
+  public Date getBeginTime() {
     return beginTime;
   }
 
-  public void setBeginTime(long beginTime) {
+  public void setBeginTime(Date beginTime) {
     this.beginTime = beginTime;
   }
 
-  public long getEndTime() {
+  public Date getEndTime() {
     return endTime;
   }
 
-  public void setEndTime(long endTime) {
+  public void setEndTime(Date endTime) {
     this.endTime = endTime;
   }
 
@@ -148,12 +149,12 @@ public class TxEntity implements Serializable {
       txEntity = new TxEntity();
     }
 
-    public Builder beginTime(long beginTime) {
+    public Builder beginTime(Date beginTime) {
       txEntity.setBeginTime(beginTime);
       return this;
     }
 
-    public Builder endTime(long endTime) {
+    public Builder endTime(Date endTime) {
       txEntity.setEndTime(endTime);
       return this;
     }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 7f8eaac..5354f21 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -21,14 +21,19 @@ import akka.actor.AbstractExtensionId;
 import akka.actor.ExtendedActorSystem;
 import akka.actor.Extension;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.TransactionType;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.repository.model.GloablTransaction;
+import org.apache.servicecomb.pack.alpha.fsm.repository.model.SagaSubTransaction;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
 
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
@@ -44,12 +49,11 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
   public static class SagaDataExt implements Extension {
 
-    //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
     private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
     private String lastGlobalTxId;
     private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
-    private volatile ApplicationContext applicationContext;
     private MetricsService metricsService;
+    private TransactionRepositoryChannel repositoryChannel;
 
     public SagaDataExt() {
       // Just to avoid the overflow of the OldGen for stress testing
@@ -60,10 +64,7 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
     }
 
     public void putSagaData(String globalTxId, SagaData sagaData) {
-      //if(!globalTxIds.contains(globalTxId)){
       lastGlobalTxId = globalTxId;
-      //  globalTxIds.add(globalTxId);
-      //}
       sagaDataMap.put(globalTxId, sagaData);
     }
 
@@ -78,6 +79,29 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
       } else if (sagaData.getLastState() == SagaActorState.SUSPENDED) {
         this.metricsService.metrics().doSuspended();
       }
+      List<SagaSubTransaction> subTransactions = new ArrayList();
+      sagaData.getTxEntityMap().forEach((k,v)->{
+        subTransactions.add(SagaSubTransaction.builder()
+            .parentTxId(v.getParentTxId())
+            .localTxId(v.getLocalTxId())
+            .beginTime(v.getBeginTime())
+            .endTime(v.getEndTime())
+            .state(v.getState())
+            .build());
+      });
+      GloablTransaction record = GloablTransaction.builder()
+          .type(TransactionType.SAGA)
+          .serviceName(sagaData.getServiceName())
+          .instanceId(sagaData.getInstanceId())
+          .globalTxId(sagaData.getGlobalTxId())
+          .beginTime(sagaData.getBeginTime())
+          .endTime(sagaData.getEndTime())
+          .state(sagaData.getLastState())
+          .subTxSize(sagaData.getTxEntityMap().size())
+          .subTransactions(subTransactions)
+          .events(sagaData.getEvents())
+          .build();
+      repositoryChannel.send(record);
     }
 
     public SagaData getSagaData(String globalTxId) {
@@ -88,7 +112,6 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
 
     // Only test
     public void clearSagaData() {
-      //globalTxIds.clear();
       lastGlobalTxId = null;
       sagaDataMap.clear();
     }
@@ -114,9 +137,9 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
       this.metricsService = metricsService;
     }
 
-    public void initialize(ApplicationContext applicationContext) {
-      this.applicationContext = applicationContext;
-      this.setMetricsService(this.applicationContext.getBean(MetricsService.class));
+    public void setRepositoryChannel(
+        TransactionRepositoryChannel repositoryChannel) {
+      this.repositoryChannel = repositoryChannel;
     }
   }
 
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 783142c..198ebf4 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -36,16 +36,30 @@ import java.util.UUID;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
+
 import static org.hamcrest.CoreMatchers.*;
 
+@RunWith(MockitoJUnitRunner.class)
 public class SagaActorTest {
 
   static ActorSystem system;
 
+  @Mock
+  ElasticsearchTemplate template;
+
   static MetricsService metricsService = new MetricsService();
 
   private static Map<String,Object> getPersistenceMemConfig(){
@@ -82,6 +96,14 @@ public class SagaActorTest {
     system = null;
   }
 
+  @Before
+  public void before(){
+    TransactionRepository repository = new ElasticsearchTransactionRepository(template, metricsService, 0,0);
+    TransactionRepositoryChannel repositoryChannel = new MemoryTransactionRepositoryChannel(repository, -1,
+        metricsService);
+    SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
+  }
+
   public String genPersistenceId() {
     return UUID.randomUUID().toString();
   }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index d17de0e..c696031 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -31,19 +31,28 @@ import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExt
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
+        //akka
         "alpha.feature.akka.enabled=true",
         "alpha.feature.akka.channel.type=memory",
         "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
         "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
         "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
-        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
+        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots",
+        //elasticsearch
+        "alpha.feature.akka.transcation.repository.channel.type=memory",
+        "alpha.feature.akka.transcation.repository.type=elasticsearch",
+        "spring.data.elasticsearch.cluster-name=alpha-cluster",
+        "spring.data.elasticsearch.cluster-nodes=localhost:9300",
+        "spring.elasticsearch.rest.uris=http://localhost:9200"
     })
 public class SagaIntegrationTest {
 
@@ -56,6 +65,9 @@ public class SagaIntegrationTest {
   @Autowired
   MetricsService metricsService;
 
+  @Mock
+  ElasticsearchTemplate elasticsearchTemplate;
+
   @BeforeClass
   public static void setup(){
     SagaDataExtension.autoCleanSagaDataMap=false;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d0902e5..1865de2 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -53,8 +53,20 @@ import org.springframework.test.context.junit4.SpringRunner;
         "alpha.server.port=8090",
         "alpha.event.pollingInterval=1",
         "spring.main.allow-bean-definition-overriding=true",
+        "spring.profiles.active=akka-persistence-mem",
+        //akka
         "alpha.feature.akka.enabled=true",
-        "spring.profiles.active=akka-persistence-mem"
+        "alpha.feature.akka.channel.type=memory",
+        "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
+        "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
+        "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
+        "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots",
+        //elasticsearch
+        "alpha.feature.akka.transcation.repository.channel.type=memory",
+        "alpha.feature.akka.transcation.repository.type=elasticsearch",
+        "spring.data.elasticsearch.cluster-name=alpha-cluster",
+        "spring.data.elasticsearch.cluster-nodes=localhost:9300",
+        "spring.elasticsearch.rest.uris=http://localhost:9200"
        })
 public class AlphaIntegrationFsmTest {
   private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();