You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/05 14:47:14 UTC
[servicecomb-pack] 03/38: SCB-1369 Save transaction data to
Elasticsearch after SagaActor stopped
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 9d5a712edad07f41d61cf6430222c44d1caa9dfe
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Thu Jul 18 14:36:34 2019 +0800
SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
---
.../pack/alpha/fsm/FsmAutoConfiguration.java | 85 ++++++++++++++++++----
.../servicecomb/pack/alpha/fsm/SagaActor.java | 2 +-
.../pack/alpha/fsm/event/base/BaseEvent.java | 4 +
.../pack/alpha/fsm/metrics/MetricsBean.java | 46 ++++++++++++
.../servicecomb/pack/alpha/fsm/model/TxEntity.java | 17 +++--
.../spring/integration/akka/SagaDataExtension.java | 43 ++++++++---
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 22 ++++++
.../pack/alpha/fsm/SagaIntegrationTest.java | 14 +++-
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 14 +++-
9 files changed, 213 insertions(+), 34 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 9101686..28e517b 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -24,13 +24,21 @@ import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Map;
+<<<<<<< HEAD
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
+=======
+import javax.annotation.PostConstruct;
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
import org.apache.servicecomb.pack.alpha.fsm.channel.redis.MessagePublisher;
import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessageSubscriber;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
@@ -50,6 +58,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.ConfigurableEnvironment;
+<<<<<<< HEAD
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
@@ -59,64 +68,92 @@ import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
+=======
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
@Configuration
@ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
public class FsmAutoConfiguration {
- @Value("${alpha.feature.akka.channel.memory.size:-1}")
+ @Value("${alpha.feature.akka.elasticsearch.memory.size:-1}")
int memoryEventChannelMemorySize;
+ @Value("${alpha.feature.akka.transaction.repository.elasticsearch.batchSize:100}")
+ int repositoryElasticsearchBatchSize;
+
+ @Value("${alpha.feature.akka.transaction.repository.elasticsearch.refreshTime:5000}")
+ int repositoryElasticsearchRefreshTime;
+
+ @Value("${alpha.feature.akka.transaction.repository.elasticsearch.memory.size:-1}")
+ int memoryTransactionRepositoryChannelSize;
+
+ @PostConstruct
+ void init() {
+ System.setProperty("es.set.netty.runtime.available.processors", "false");
+ }
+
@Bean
- public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
- ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
+ public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext,
+ ConfigurableEnvironment environment, MetricsService metricsService,
+ TransactionRepositoryChannel repositoryChannel) {
+ ActorSystem system = ActorSystem
+ .create("alpha-akka", akkaConfiguration(applicationContext, environment));
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
- SAGA_DATA_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
+ SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
+ SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
return system;
}
@Bean
- public Config akkaConfiguration(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
+ public Config akkaConfiguration(ConfigurableApplicationContext applicationContext,
+ ConfigurableEnvironment environment) {
final Map<String, Object> converted = AkkaConfigPropertyAdapter.getPropertyMap(environment);
- return ConfigFactory.parseMap(converted).withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
+ return ConfigFactory.parseMap(converted)
+ .withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
}
@Bean
- public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+ public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor() {
return new EventSubscribeBeanPostProcessor();
}
@Bean
- public MetricsService metricsService(){
+ public MetricsService metricsService() {
return new MetricsService();
}
@Bean
- public ActorEventSink actorEventSink(MetricsService metricsService){
+ public ActorEventSink actorEventSink(MetricsService metricsService) {
return new SagaActorEventSender(metricsService);
}
@Bean
@ConditionalOnMissingBean(ActorEventChannel.class)
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
- public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
- return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,metricsService);
+ public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink,
+ MetricsService metricsService) {
+ return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize,
+ metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
- public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink,
+ MetricsService metricsService) {
return new ActiveMQActorEventChannel(actorEventSink, metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
- public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink, MetricsService metricsService){
+ public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink,
+ MetricsService metricsService) {
return new KafkaActorEventChannel(actorEventSink, metricsService);
}
@Bean
@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+<<<<<<< HEAD
public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink, MetricsService metricsService, @Lazy RedisMessagePublisher redisMessagePublisher){
return new RedisActorEventChannel(actorEventSink, metricsService, redisMessagePublisher);
}
@@ -174,6 +211,28 @@ public class FsmAutoConfiguration {
}
return new ChannelTopic(topic);
}
+=======
+ public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink,
+ MetricsService metricsService) {
+ return new RedisActorEventChannel(actorEventSink, metricsService);
+>>>>>>> SCB-1369 Save transaction data to Elasticsearch after SagaActor stopped
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.transcation.repository.type", havingValue = "elasticsearch", matchIfMissing = true)
+ public TransactionRepository transcationRepository(MetricsService metricsService,
+ ElasticsearchTemplate template) {
+ return new ElasticsearchTransactionRepository(template, metricsService,
+ repositoryElasticsearchBatchSize, repositoryElasticsearchRefreshTime);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(TransactionRepositoryChannel.class)
+ @ConditionalOnProperty(value = "alpha.feature.akka.transcation.repository.channel.type", havingValue = "memory", matchIfMissing = true)
+ TransactionRepositoryChannel memoryTransactionRepositoryChannel(TransactionRepository repository,
+ MetricsService metricsService) {
+ return new MemoryTransactionRepositoryChannel(repository, memoryTransactionRepositoryChannelSize,
+ metricsService);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index f173a6f..b998dcf 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -406,7 +406,7 @@ public class SagaActor extends
} else if (event instanceof UpdateTxEventDomain) {
UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
- txEntity.setEndTime(System.currentTimeMillis());
+ txEntity.setEndTime(new Date());
if (domainEvent.getState() == TxState.COMMITTED) {
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
index f3ad9df..ef27594 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -33,6 +33,10 @@ public abstract class BaseEvent implements Serializable {
}
+ public String getType() {
+ return this.getClass().getSimpleName();
+ }
+
public String getServiceName() {
return serviceName;
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
index b1b8ad3..55e8af2 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsBean.java
@@ -36,6 +36,10 @@ public class MetricsBean {
private AtomicLong committed = new AtomicLong();
private AtomicLong compensated = new AtomicLong();
private AtomicLong suspended = new AtomicLong();
+ private AtomicLong repositoryReceived = new AtomicLong();
+ private AtomicLong repositoryAccepted = new AtomicLong();
+ private AtomicLong repositoryRejected = new AtomicLong();
+ private AtomicDouble repositoryAvgTime = new AtomicDouble();//milliseconds moving average
public void doEventReceived() {
eventReceived.incrementAndGet();
@@ -107,6 +111,31 @@ public class MetricsBean {
suspended.incrementAndGet();
}
+ public void doRepositoryReceived() {
+ repositoryReceived.incrementAndGet();
+ }
+
+ public void doRepositoryAccepted() {
+ repositoryAccepted.incrementAndGet();
+ }
+
+ public void doRepositoryAccepted(int size) {
+ repositoryAccepted.addAndGet(size);
+ }
+
+ public void doRepositoryRejected() {
+ repositoryReceived.decrementAndGet();
+ repositoryRejected.incrementAndGet();
+ }
+
+ public void doRepositoryAvgTime(long time) {
+ if (repositoryAvgTime.get() == 0) {
+ repositoryAvgTime.set(time);
+ } else {
+ repositoryAvgTime.set((repositoryAvgTime.get() + time) / 2);
+ }
+ }
+
public long getEventReceived() {
return eventReceived.get();
}
@@ -151,6 +180,22 @@ public class MetricsBean {
return (double) Math.round(sagaAvgTime.get() * 100) / 100;
}
+ public long getRepositoryReceived() {
+ return repositoryReceived.get();
+ }
+
+ public long getRepositoryAccepted() {
+ return repositoryAccepted.get();
+ }
+
+ public AtomicLong getRepositoryRejected() {
+ return repositoryRejected;
+ }
+
+ public double getRepositoryAvgTime() {
+ return (double) Math.round(repositoryAvgTime.get() * 100) / 100;
+ }
+
public long getCommitted() {
return committed.get();
}
@@ -162,4 +207,5 @@ public class MetricsBean {
public long getSuspended() {
return suspended.get();
}
+
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
index ce0fb5b..9928283 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.pack.alpha.fsm.model;
import java.io.Serializable;
+import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.servicecomb.pack.alpha.fsm.TxState;
@@ -25,8 +26,8 @@ public class TxEntity implements Serializable {
private String serviceName;
private String instanceId;
private String globalTxId;
- private long beginTime = System.currentTimeMillis();
- private long endTime;
+ private Date beginTime = new Date();
+ private Date endTime;
private String parentTxId;
private String localTxId;
private TxState state;
@@ -52,19 +53,19 @@ public class TxEntity implements Serializable {
this.instanceId = instanceId;
}
- public long getBeginTime() {
+ public Date getBeginTime() {
return beginTime;
}
- public void setBeginTime(long beginTime) {
+ public void setBeginTime(Date beginTime) {
this.beginTime = beginTime;
}
- public long getEndTime() {
+ public Date getEndTime() {
return endTime;
}
- public void setEndTime(long endTime) {
+ public void setEndTime(Date endTime) {
this.endTime = endTime;
}
@@ -148,12 +149,12 @@ public class TxEntity implements Serializable {
txEntity = new TxEntity();
}
- public Builder beginTime(long beginTime) {
+ public Builder beginTime(Date beginTime) {
txEntity.setBeginTime(beginTime);
return this;
}
- public Builder endTime(long endTime) {
+ public Builder endTime(Date endTime) {
txEntity.setEndTime(endTime);
return this;
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index 7f8eaac..5354f21 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -21,14 +21,19 @@ import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.fsm.TransactionType;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.repository.model.GloablTransaction;
+import org.apache.servicecomb.pack.alpha.fsm.repository.model.SagaSubTransaction;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
@@ -44,12 +49,11 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
public static class SagaDataExt implements Extension {
- //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
private String lastGlobalTxId;
private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
- private volatile ApplicationContext applicationContext;
private MetricsService metricsService;
+ private TransactionRepositoryChannel repositoryChannel;
public SagaDataExt() {
// Just to avoid the overflow of the OldGen for stress testing
@@ -60,10 +64,7 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
}
public void putSagaData(String globalTxId, SagaData sagaData) {
- //if(!globalTxIds.contains(globalTxId)){
lastGlobalTxId = globalTxId;
- // globalTxIds.add(globalTxId);
- //}
sagaDataMap.put(globalTxId, sagaData);
}
@@ -78,6 +79,29 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
} else if (sagaData.getLastState() == SagaActorState.SUSPENDED) {
this.metricsService.metrics().doSuspended();
}
+ List<SagaSubTransaction> subTransactions = new ArrayList();
+ sagaData.getTxEntityMap().forEach((k,v)->{
+ subTransactions.add(SagaSubTransaction.builder()
+ .parentTxId(v.getParentTxId())
+ .localTxId(v.getLocalTxId())
+ .beginTime(v.getBeginTime())
+ .endTime(v.getEndTime())
+ .state(v.getState())
+ .build());
+ });
+ GloablTransaction record = GloablTransaction.builder()
+ .type(TransactionType.SAGA)
+ .serviceName(sagaData.getServiceName())
+ .instanceId(sagaData.getInstanceId())
+ .globalTxId(sagaData.getGlobalTxId())
+ .beginTime(sagaData.getBeginTime())
+ .endTime(sagaData.getEndTime())
+ .state(sagaData.getLastState())
+ .subTxSize(sagaData.getTxEntityMap().size())
+ .subTransactions(subTransactions)
+ .events(sagaData.getEvents())
+ .build();
+ repositoryChannel.send(record);
}
public SagaData getSagaData(String globalTxId) {
@@ -88,7 +112,6 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
// Only test
public void clearSagaData() {
- //globalTxIds.clear();
lastGlobalTxId = null;
sagaDataMap.clear();
}
@@ -114,9 +137,9 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
this.metricsService = metricsService;
}
- public void initialize(ApplicationContext applicationContext) {
- this.applicationContext = applicationContext;
- this.setMetricsService(this.applicationContext.getBean(MetricsService.class));
+ public void setRepositoryChannel(
+ TransactionRepositoryChannel repositoryChannel) {
+ this.repositoryChannel = repositoryChannel;
}
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 783142c..198ebf4 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -36,16 +36,30 @@ import java.util.UUID;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.channel.MemoryTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
+
import static org.hamcrest.CoreMatchers.*;
+@RunWith(MockitoJUnitRunner.class)
public class SagaActorTest {
static ActorSystem system;
+ @Mock
+ ElasticsearchTemplate template;
+
static MetricsService metricsService = new MetricsService();
private static Map<String,Object> getPersistenceMemConfig(){
@@ -82,6 +96,14 @@ public class SagaActorTest {
system = null;
}
+ @Before
+ public void before(){
+ TransactionRepository repository = new ElasticsearchTransactionRepository(template, metricsService, 0,0);
+ TransactionRepositoryChannel repositoryChannel = new MemoryTransactionRepositoryChannel(repository, -1,
+ metricsService);
+ SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
+ }
+
public String genPersistenceId() {
return UUID.randomUUID().toString();
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index d17de0e..c696031 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -31,19 +31,28 @@ import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExt
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {SagaApplication.class},
properties = {
+ //akka
"alpha.feature.akka.enabled=true",
"alpha.feature.akka.channel.type=memory",
"akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
"akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
"akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
- "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
+ "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots",
+ //elasticsearch
+ "alpha.feature.akka.transcation.repository.channel.type=memory",
+ "alpha.feature.akka.transcation.repository.type=elasticsearch",
+ "spring.data.elasticsearch.cluster-name=alpha-cluster",
+ "spring.data.elasticsearch.cluster-nodes=localhost:9300",
+ "spring.elasticsearch.rest.uris=http://localhost:9200"
})
public class SagaIntegrationTest {
@@ -56,6 +65,9 @@ public class SagaIntegrationTest {
@Autowired
MetricsService metricsService;
+ @Mock
+ ElasticsearchTemplate elasticsearchTemplate;
+
@BeforeClass
public static void setup(){
SagaDataExtension.autoCleanSagaDataMap=false;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d0902e5..1865de2 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -53,8 +53,20 @@ import org.springframework.test.context.junit4.SpringRunner;
"alpha.server.port=8090",
"alpha.event.pollingInterval=1",
"spring.main.allow-bean-definition-overriding=true",
+ "spring.profiles.active=akka-persistence-mem",
+ //akka
"alpha.feature.akka.enabled=true",
- "spring.profiles.active=akka-persistence-mem"
+ "alpha.feature.akka.channel.type=memory",
+ "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
+ "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
+ "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
+ "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots",
+ //elasticsearch
+ "alpha.feature.akka.transcation.repository.channel.type=memory",
+ "alpha.feature.akka.transcation.repository.type=elasticsearch",
+ "spring.data.elasticsearch.cluster-name=alpha-cluster",
+ "spring.data.elasticsearch.cluster-nodes=localhost:9300",
+ "spring.elasticsearch.rest.uris=http://localhost:9200"
})
public class AlphaIntegrationFsmTest {
private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();