You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2022/04/08 14:34:08 UTC
[servicecomb-pack] 02/03: Rename module alpha-spec-saga-fsm to alpha-spec-saga-akka
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit e59635833ce36f7e8a13b269a8e1bba4f47afd2f
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Sun Apr 3 01:17:52 2022 +0800
Rename module alpha-spec-saga-fsm to alpha-spec-saga-akka
---
alpha/alpha-benchmark/pom.xml | 2 +-
.../jpa/EclipseLinkJpaConfiguration.java | 54 ++++++++
alpha/alpha-server/pom.xml | 2 +-
.../src/main/resources/application.yaml | 3 +
...a => AlphaIntegrationWithSpecSagaAkkaTest.java} | 7 +-
.../README.md | 0
.../pom.xml | 4 +-
.../akka/AlphaSpecSagaAkkaAutoConfiguration.java} | 81 +++++------
.../saga/akka}/CompensateAckCountDownLatch.java | 2 +-
.../alpha/spec/saga/akka}/GrpcOmegaCallback.java | 2 +-
.../spec/saga/akka}/GrpcSagaEventService.java | 2 +-
.../pack/alpha/spec/saga/akka}/SagaActor.java | 20 +--
.../pack/alpha/spec/saga/akka}/SagaActorState.java | 2 +-
.../spec/saga/akka}/SagaShardRegionActor.java | 2 +-
.../saga/akka/api/SagaAkkaAPIv1Controller.java} | 4 +-
.../spec/saga/akka/api/SagaAkkaAPIv1Impl.java} | 6 +-
.../akka}/channel/AbstractActorEventChannel.java | 5 +-
.../saga/akka}/channel/AbstractEventConsumer.java | 4 +-
.../akka/channel/kafka/KafkaActorEventChannel.java | 84 +++++++++++
.../akka}/channel/kafka/KafkaMessagePublisher.java | 17 ++-
.../channel/kafka/KafkaSagaEventConsumer.java | 10 +-
.../channel/memory/MemoryActorEventChannel.java | 6 +-
.../channel/memory/MemorySagaEventConsumer.java | 6 +-
.../channel/rabbit/RabbitActorEventChannel.java | 6 +-
.../rabbit/RabbitChannelAutoConfiguration.java | 8 +-
.../akka}/channel/rabbit/RabbitMessageChannel.java | 2 +-
.../channel/rabbit/RabbitMessagePublisher.java | 2 +-
.../channel/rabbit/RabbitSagaEventConsumer.java | 8 +-
.../akka}/channel/redis/MessageSerializer.java | 2 +-
.../channel/redis/RedisActorEventChannel.java | 6 +-
.../redis/RedisChannelAutoConfiguration.java | 6 +-
.../akka}/channel/redis/RedisMessagePublisher.java | 2 +-
.../channel/redis/RedisSagaEventConsumer.java | 6 +-
.../spec/saga/akka}/domain/AddTxEventDomain.java | 2 +-
.../alpha/spec/saga/akka}/domain/DomainEvent.java | 2 +-
.../spec/saga/akka}/domain/SagaEndedDomain.java | 4 +-
.../spec/saga/akka}/domain/SagaStartedDomain.java | 2 +-
.../saga/akka}/domain/UpdateTxEventDomain.java | 2 +-
.../akka}/metrics/AlphaMetricsEndpointImpl.java | 2 +-
.../spec/saga/akka}/metrics/MetricsService.java | 2 +-
.../pack/alpha/spec/saga/akka}/model/SagaData.java | 4 +-
.../alpha/spec/saga/akka}/model/TxEntities.java | 2 +-
.../pack/alpha/spec/saga/akka}/model/TxEntity.java | 2 +-
.../akka/properties/ChannelKafkaProperties.java | 76 ++++++++++
.../akka}/properties/ChannelMemoryProperties.java | 2 +-
.../saga/akka}/properties/ChannelProperties.java | 12 +-
.../akka}/properties/ElasticsearchProperties.java | 2 +-
.../akka}/properties/RepositoryProperties.java | 2 +-
.../akka}/properties/SpecSagaAkkaProperties.java | 2 +-
.../AbstractTransactionRepositoryChannel.java | 4 +-
.../repository/NoneTransactionRepository.java | 2 +-
.../akka}/repository/TransactionRepository.java | 2 +-
.../repository/TransactionRepositoryChannel.java | 2 +-
.../DefaultTransactionRepositoryChannel.java | 11 +-
.../ElasticsearchTransactionRepository.java | 8 +-
.../elasticsearch/GlobalTransactionDocument.java | 2 +-
.../integration/akka/AkkaClusterListener.java | 2 +-
.../akka/AkkaConfigPropertyAdapter.java | 2 +-
.../spring/integration/akka/SagaDataExtension.java | 12 +-
.../integration/akka/SpringAkkaExtension.java | 6 +-
.../saga/akka}/test/FsmSagaDataController.java | 6 +-
.../src/main/resources/META-INF/spring.factories | 2 +-
.../alpha/spec/saga/akka/it/SagaApplication.java | 0
.../spec/saga/akka/it/SagaIntegrationTest.java | 10 +-
.../alpha/spec/saga/akka/ut/SagaActorTest.java | 22 +--
.../spec/saga/akka/ut/api/APIv1ControllerTest.java | 10 +-
.../spec/saga/akka/ut/api/TestConfiguration.java | 18 +--
.../spec/saga/akka/ut/model/TxEntitiesTest.java | 4 +-
.../ElasticsearchTransactionRepositoryTest.java | 10 +-
.../alpha/spec/saga/akka/util/SagaEventSender.java | 0
.../src/test/resources/application.yaml | 0
.../src/test/resources/log4j2.xml | 0
.../fsm/channel/kafka/KafkaActorEventChannel.java | 37 -----
.../kafka/KafkaChannelAutoConfiguration.java | 153 ---------------------
.../fsm/properties/ChannelKafkaProperties.java | 21 ---
alpha/pom.xml | 2 +-
.../pack/omega/spring/OmegaSagaSpringConfig.java | 87 ++++++++++++
pack-dependencies/pom.xml | 2 +-
78 files changed, 524 insertions(+), 406 deletions(-)
diff --git a/alpha/alpha-benchmark/pom.xml b/alpha/alpha-benchmark/pom.xml
index 7162cef9..877745d8 100644
--- a/alpha/alpha-benchmark/pom.xml
+++ b/alpha/alpha-benchmark/pom.xml
@@ -36,7 +36,7 @@
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
- <artifactId>alpha-spec-saga-fsm</artifactId>
+ <artifactId>alpha-spec-saga-akka</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
diff --git a/alpha/alpha-persistence-jpa/src/main/java/org/apache/servicecomb/pack/persistence/jpa/EclipseLinkJpaConfiguration.java b/alpha/alpha-persistence-jpa/src/main/java/org/apache/servicecomb/pack/persistence/jpa/EclipseLinkJpaConfiguration.java
new file mode 100644
index 00000000..572667b4
--- /dev/null
+++ b/alpha/alpha-persistence-jpa/src/main/java/org/apache/servicecomb/pack/persistence/jpa/EclipseLinkJpaConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.persistence.jpa;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.orm.jpa.JpaBaseConfiguration;
+import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.orm.jpa.vendor.AbstractJpaVendorAdapter;
+import org.springframework.orm.jpa.vendor.EclipseLinkJpaVendorAdapter;
+import org.springframework.transaction.jta.JtaTransactionManager;
+
+@Configuration
+public class EclipseLinkJpaConfiguration extends JpaBaseConfiguration {
+
+ EclipseLinkJpaConfiguration(DataSource dataSource,
+ JpaProperties properties,
+ ObjectProvider<JtaTransactionManager> jtaTransactionManagerProvider) {
+ super(dataSource, properties, jtaTransactionManagerProvider);
+ }
+
+ @Override
+ protected AbstractJpaVendorAdapter createJpaVendorAdapter() {
+ return new EclipseLinkJpaVendorAdapter();
+ }
+
+ @Override
+ protected Map<String, Object> getVendorProperties() {
+ Map<String, Object> props = new HashMap<>();
+ props.put("eclipselink.weaving", "false");
+ props.put("eclipselink.logging.logger", "JavaLogger");
+ return props;
+ }
+}
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 663a56a2..b194a698 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -48,7 +48,7 @@
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
- <artifactId>alpha-spec-saga-fsm</artifactId>
+ <artifactId>alpha-spec-saga-akka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 059a5964..59ed23f8 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -33,9 +33,12 @@ alpha:
kafka:
topic: servicecomb-pack-actor-event
bootstrap-servers: 127.0.0.1:9092
+ numPartitions: 6
+ replicationFactor: 1
consumer:
group-id: servicecomb-pack
auto.offset.reset: earliest
+ enable.auto.commit: false
producer:
batch-size: 16384
retries: 0
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationWithSpecSagaAkkaTest.java
similarity index 99%
rename from alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationFsmTest.java
rename to alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationWithSpecSagaAkkaTest.java
index 484bf7a6..e56714aa 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaIntegrationWithSpecSagaAkkaTest.java
@@ -31,9 +31,8 @@ import java.util.Map;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
import org.apache.servicecomb.pack.common.AlphaMetaKeys;
@@ -71,7 +70,7 @@ import org.springframework.test.context.junit4.SpringRunner;
"akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
"akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
})
-public class AlphaIntegrationFsmTest {
+public class AlphaIntegrationWithSpecSagaAkkaTest {
private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();
private static final int port = 8090;
diff --git a/alpha/alpha-spec-saga-fsm/README.md b/alpha/alpha-spec-saga-akka/README.md
similarity index 100%
rename from alpha/alpha-spec-saga-fsm/README.md
rename to alpha/alpha-spec-saga-akka/README.md
diff --git a/alpha/alpha-spec-saga-fsm/pom.xml b/alpha/alpha-spec-saga-akka/pom.xml
similarity index 98%
rename from alpha/alpha-spec-saga-fsm/pom.xml
rename to alpha/alpha-spec-saga-akka/pom.xml
index 97da3121..a7617b92 100644
--- a/alpha/alpha-spec-saga-fsm/pom.xml
+++ b/alpha/alpha-spec-saga-akka/pom.xml
@@ -26,8 +26,8 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>alpha-spec-saga-fsm</artifactId>
- <name>Pack::Alpha::Specification::Saga::FSM</name>
+ <artifactId>alpha-spec-saga-akka</artifactId>
+ <name>Pack::Alpha::Specification::Saga::Akka</name>
<dependencies>
<!-- spring boot -->
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/AlphaSpecSagaFsmAutoConfiguration.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaSpecSagaAkkaAutoConfiguration.java
similarity index 66%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/AlphaSpecSagaFsmAutoConfiguration.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaSpecSagaAkkaAutoConfiguration.java
index d4ca777b..d02f7d83 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/AlphaSpecSagaFsmAutoConfiguration.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/AlphaSpecSagaAkkaAutoConfiguration.java
@@ -15,10 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
-
-import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
-import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -32,21 +29,25 @@ import org.apache.http.HttpHost;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetricsEndpoint;
-import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Controller;
-import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Impl;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemorySagaEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.AlphaMetricsEndpointImpl;
-import org.apache.servicecomb.pack.alpha.fsm.properties.SpecSagaAkkaProperties;
-import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
-import org.apache.servicecomb.pack.alpha.fsm.test.FsmSagaDataController;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.api.SagaAkkaAPIv1Controller;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.api.SagaAkkaAPIv1Impl;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka.KafkaActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka.KafkaSagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.memory.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.memory.MemorySagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.AlphaMetricsEndpointImpl;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.SpecSagaAkkaProperties;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.NoneTransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.channel.DefaultTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.AkkaConfigPropertyAdapter;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SpringAkkaExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.test.FsmSagaDataController;
import org.apache.servicecomb.pack.common.AlphaMetaKeys;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.elasticsearch.client.RestClient;
@@ -57,7 +58,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -68,11 +68,11 @@ import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
@Configuration
@ImportAutoConfiguration({SpecSagaAkkaProperties.class})
@ConditionalOnExpression("'${alpha.spec.names}'.contains('saga-akka')")
-public class AlphaSpecSagaFsmAutoConfiguration {
+public class AlphaSpecSagaAkkaAutoConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public AlphaSpecSagaFsmAutoConfiguration() {
+ public AlphaSpecSagaAkkaAutoConfiguration() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
LOG.info("Alpha Specification Saga Akka");
}
@@ -89,9 +89,10 @@ public class AlphaSpecSagaFsmAutoConfiguration {
ActorSystem system = ActorSystem
.create("alpha-cluster", akkaConfiguration(applicationContext, environment));
- SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
- SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
- SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
+ SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system)
+ .setRepositoryChannel(repositoryChannel);
+ SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
return system;
}
@@ -141,12 +142,10 @@ public class AlphaSpecSagaFsmAutoConfiguration {
}
@Bean
- @ConditionalOnMissingBean(ActorEventChannel.class)
public ActorEventChannel eventChannel(SpecSagaAkkaProperties specSagaAkkaProperties,
MetricsService metricsService) {
- if (specSagaAkkaProperties.getChannel().getName().equals("memory")) {
- return new MemoryActorEventChannel(metricsService,
- specSagaAkkaProperties.getChannel().getMemory().getMaxLength());
+ if (specSagaAkkaProperties.getChannel().getName().equals("kafka")) {
+ return new KafkaActorEventChannel(specSagaAkkaProperties, metricsService);
} else {
return new MemoryActorEventChannel(metricsService,
specSagaAkkaProperties.getChannel().getMemory().getMaxLength());
@@ -159,9 +158,12 @@ public class AlphaSpecSagaFsmAutoConfiguration {
@Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
MetricsService metricsService,
ActorEventChannel actorEventChannel) {
- if (specSagaAkkaProperties.getChannel().getName().equals("memory")) {
- return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
- (MemoryActorEventChannel) actorEventChannel);
+ if (specSagaAkkaProperties.getChannel().getName().equals("kafka")) {
+ return new KafkaSagaEventConsumer(actorSystem,
+ sagaShardRegionActor, metricsService,
+ specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers(),
+ specSagaAkkaProperties.getChannel().getKafka().getTopic(),
+ specSagaAkkaProperties.getChannel().getKafka().getConsumer());
} else {
return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
(MemoryActorEventChannel) actorEventChannel);
@@ -169,30 +171,31 @@ public class AlphaSpecSagaFsmAutoConfiguration {
}
@Bean
- GrpcSagaEventService grpcSagaEventService(ActorEventChannel actorEventChannel, Map<String, Map<String, OmegaCallback>> omegaCallbacks){
+ GrpcSagaEventService grpcSagaEventService(ActorEventChannel actorEventChannel,
+ Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
ServerMeta serverMeta = ServerMeta.newBuilder()
.putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(true)).build();
return new GrpcSagaEventService(actorEventChannel, omegaCallbacks, serverMeta);
}
@Bean
- APIv1Impl apIv1(){
- return new APIv1Impl();
+ SagaAkkaAPIv1Impl apIv1() {
+ return new SagaAkkaAPIv1Impl();
}
@Bean
- APIv1Controller apIv1Controller(){
- return new APIv1Controller();
+ SagaAkkaAPIv1Controller apIv1Controller() {
+ return new SagaAkkaAPIv1Controller();
}
@Bean
- AlphaMetricsEndpoint alphaMetricsEndpoint(){
+ AlphaMetricsEndpoint alphaMetricsEndpoint() {
return new AlphaMetricsEndpointImpl();
}
@Bean
@Profile("test")
- FsmSagaDataController fsmSagaDataController(){
+ FsmSagaDataController fsmSagaDataController() {
return new FsmSagaDataController();
}
}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/CompensateAckCountDownLatch.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/CompensateAckCountDownLatch.java
similarity index 95%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/CompensateAckCountDownLatch.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/CompensateAckCountDownLatch.java
index cdaf29eb..87da93c9 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/CompensateAckCountDownLatch.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/CompensateAckCountDownLatch.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import java.util.concurrent.CountDownLatch;
import org.apache.servicecomb.pack.alpha.core.fsm.CompensateAckType;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcOmegaCallback.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcOmegaCallback.java
similarity index 98%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcOmegaCallback.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcOmegaCallback.java
index 9f27f95c..d21d333a 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcOmegaCallback.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcOmegaCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcSagaEventService.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcSagaEventService.java
similarity index 99%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcSagaEventService.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcSagaEventService.java
index 82552d04..21ddf0a2 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcSagaEventService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import static java.util.Collections.emptyMap;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java
index 90336830..124d2050 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import akka.actor.PoisonPill;
import akka.actor.Props;
@@ -44,15 +44,15 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
-import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
-import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
-import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
-import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
-import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.AddTxEventDomain;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.DomainEvent;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.SagaEndedDomain;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.SagaStartedDomain;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.UpdateTxEventDomain;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SpringAkkaExtension;
import org.apache.servicecomb.pack.common.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActorState.java
similarity index 94%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActorState.java
index 7c7c6adc..70827610 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActorState.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import akka.persistence.fsm.PersistentFSM;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaShardRegionActor.java
similarity index 98%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaShardRegionActor.java
index 6298cbed..03886da5 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaShardRegionActor.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaShardRegionActor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Controller.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Controller.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Controller.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Controller.java
index a6dd52e0..30a8dfdc 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Controller.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Controller.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.api;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.api;
import java.util.List;
import java.util.Map;
@@ -31,7 +31,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@RequestMapping("/alpha/api/v1")
-public class APIv1Controller {
+public class SagaAkkaAPIv1Controller {
@Autowired
APIv1 apIv1;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Impl.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Impl.java
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Impl.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Impl.java
index 8a030867..2124289b 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/api/APIv1Impl.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/api/SagaAkkaAPIv1Impl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.api;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.api;
import java.util.List;
import java.util.Map;
@@ -24,10 +24,10 @@ import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransac
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.PagingGlobalTransactions;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetrics;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetricsEndpoint;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
import org.springframework.beans.factory.annotation.Autowired;
-public class APIv1Impl implements APIv1 {
+public class SagaAkkaAPIv1Impl implements APIv1 {
@Autowired
AlphaMetricsEndpoint alphaMetricsEndpoint;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractActorEventChannel.java
similarity index 90%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractActorEventChannel.java
index 61bbde5a..be1df180 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractActorEventChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractActorEventChannel.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractEventConsumer.java
similarity index 78%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractEventConsumer.java
index 6869add3..2e13b500 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/AbstractEventConsumer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/AbstractEventConsumer.java
@@ -1,8 +1,8 @@
-package org.apache.servicecomb.pack.alpha.fsm.channel;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
public abstract class AbstractEventConsumer {
diff --git a/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.java
new file mode 100644
index 00000000..3916a0b5
--- /dev/null
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.SpecSagaAkkaProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaActorEventChannel extends AbstractActorEventChannel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final KafkaMessagePublisher kafkaMessagePublisher;
+
+ public KafkaActorEventChannel(SpecSagaAkkaProperties specSagaAkkaProperties, MetricsService metricsService) {
+ super(metricsService);
+ // init topic
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers());
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
+ try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+ try {
+ final NewTopic newTopic = new NewTopic(
+ specSagaAkkaProperties.getChannel().getKafka().getTopic(),
+ specSagaAkkaProperties.getChannel().getKafka().getNumPartitions(),
+ specSagaAkkaProperties.getChannel().getKafka().getReplicationFactor());
+ final CreateTopicsResult createTopicsResult = adminClient
+ .createTopics(Collections.singleton(newTopic));
+ createTopicsResult.values().get(specSagaAkkaProperties.getChannel().getKafka().getTopic())
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ if (!(e.getCause() instanceof TopicExistsException)) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+ // create producer
+ this.kafkaMessagePublisher = new KafkaMessagePublisher(
+ specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers(),
+ specSagaAkkaProperties.getChannel().getKafka().getTopic(),
+ specSagaAkkaProperties.getChannel().getKafka().getProducer());
+ LOG.info("Kafka Channel Init");
+ }
+
+ @Override
+ public void sendTo(BaseEvent event) {
+ kafkaMessagePublisher.publish(event);
+ }
+}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaMessagePublisher.java
similarity index 67%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaMessagePublisher.java
index f156bc2c..a5c88d70 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaMessagePublisher.java
@@ -15,14 +15,20 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka;
+import com.google.common.collect.Maps;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.serializer.JsonSerializer;
public class KafkaMessagePublisher implements MessagePublisher<BaseEvent> {
@@ -31,9 +37,14 @@ public class KafkaMessagePublisher implements MessagePublisher<BaseEvent> {
private String topic;
private KafkaTemplate<String, Object> kafkaTemplate;
- public KafkaMessagePublisher(String topic, KafkaTemplate<String, Object> kafkaTemplate) {
+ public KafkaMessagePublisher(String bootstrap_servers, String topic, Map<String,String> producerConfigMap) {
this.topic = topic;
- this.kafkaTemplate = kafkaTemplate;
+ Map<String, Object> map = Maps.newHashMap();
+ producerConfigMap.forEach((k, v) -> map.put(k, v));
+ map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers);
+ map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ this.kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(map));
}
@Override
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaSagaEventConsumer.java
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaSagaEventConsumer.java
index 8ee2d401..c39d98bb 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaSagaEventConsumer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaSagaEventConsumer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -31,14 +31,15 @@ import akka.util.Timeout;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import java.lang.invoke.MethodHandles;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -52,7 +53,7 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer {
final ObjectMapper jsonMapper = new ObjectMapper();
public KafkaSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
- MetricsService metricsService, String bootstrap_servers, String topic) {
+ MetricsService metricsService, String bootstrap_servers, String topic, Map<String,String> consumerConfigMap) {
super(actorSystem, sagaShardRegionActor, metricsService);
@@ -68,6 +69,7 @@ public class KafkaSagaEventConsumer extends AbstractEventConsumer {
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class")
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class");
+ consumerConfigMap.forEach((k,v) -> consumerSettings.withProperty(k,v));
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(20, event -> {
BaseEvent bean = jsonMapper.readValue(event.record().value(), BaseEvent.class);
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemoryActorEventChannel.java
similarity index 87%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemoryActorEventChannel.java
index dfdd7441..519465c1 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryActorEventChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemoryActorEventChannel.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.memory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemorySagaEventConsumer.java
similarity index 91%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemorySagaEventConsumer.java
index f2af56bb..99f202bc 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/memory/MemorySagaEventConsumer.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.memory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitActorEventChannel.java
similarity index 84%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitActorEventChannel.java
index a6623efb..8e10d60a 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitActorEventChannel.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.rabbit;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
public class RabbitActorEventChannel extends AbstractActorEventChannel {
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitChannelAutoConfiguration.java
similarity index 89%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitChannelAutoConfiguration.java
index 1e304155..e760f6fb 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitChannelAutoConfiguration.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.rabbit;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.util.Map;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -36,7 +36,7 @@ import org.springframework.context.annotation.Lazy;
@EnableBinding({RabbitMessageChannel.class})
@Configuration
@EnableConfigurationProperties(BindingServiceProperties.class)
-@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "rabbit")
+@ConditionalOnProperty(value = "alpha.spec.saga.akka.channel.name", havingValue = "rabbit")
public class RabbitChannelAutoConfiguration {
@@ -61,7 +61,7 @@ public class RabbitChannelAutoConfiguration {
@ConditionalOnMissingBean(ActorEventChannel.class)
public ActorEventChannel rabbitEventChannel(MetricsService metricsService,
@Lazy RabbitMessagePublisher rabbitMessagePublisher) {
- return new org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitActorEventChannel(metricsService, rabbitMessagePublisher);
+ return new RabbitActorEventChannel(metricsService, rabbitMessagePublisher);
}
}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessageChannel.java
similarity index 94%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessageChannel.java
index 3b040608..dbf50127 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessageChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.rabbit;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessagePublisher.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessagePublisher.java
index 8a2c82a2..3ca10a26 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitMessagePublisher.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.rabbit;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitSagaEventConsumer.java
similarity index 89%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitSagaEventConsumer.java
index 9d72b948..b2abc4c1 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/rabbit/RabbitSagaEventConsumer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.rabbit;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -26,8 +26,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
@@ -43,7 +43,7 @@ public class RabbitSagaEventConsumer extends AbstractEventConsumer {
}
- @StreamListener(org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitMessageChannel.SERVICE_COMB_PACK_CONSUMER)
+ @StreamListener(RabbitMessageChannel.SERVICE_COMB_PACK_CONSUMER)
public void receive(BaseEvent baseEvent) {
sendSagaActor(baseEvent);
}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/MessageSerializer.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/MessageSerializer.java
index 5665eeed..641542af 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/MessageSerializer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/MessageSerializer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisActorEventChannel.java
similarity index 86%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisActorEventChannel.java
index e023bb06..4a053a07 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisActorEventChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisActorEventChannel.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisChannelAutoConfiguration.java
similarity index 95%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisChannelAutoConfiguration.java
index 7d159677..6c9ab547 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisChannelAutoConfiguration.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisChannelAutoConfiguration.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -23,7 +23,7 @@ import javax.annotation.PostConstruct;
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -45,7 +45,7 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@ConditionalOnClass(RedisConnection.class)
-@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+@ConditionalOnProperty(value = "alpha.spec.saga.akka.channel.name", havingValue = "redis")
public class RedisChannelAutoConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisMessagePublisher.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisMessagePublisher.java
index eca2af44..434aa4a7 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisMessagePublisher.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisMessagePublisher.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;
import java.lang.invoke.MethodHandles;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisSagaEventConsumer.java
similarity index 91%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisSagaEventConsumer.java
index b9cb31e9..340f0f58 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/redis/RedisSagaEventConsumer.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisSagaEventConsumer.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.channel.redis;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/AddTxEventDomain.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/AddTxEventDomain.java
index f7ea6387..5f69bcd4 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/AddTxEventDomain.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.domain;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.domain;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/DomainEvent.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/DomainEvent.java
index b6bbaebc..0229b859 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/DomainEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.domain;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.domain;
import java.io.Serializable;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaEndedDomain.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaEndedDomain.java
index 9cbe70be..b194609a 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaEndedDomain.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.domain;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.domain;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
public class SagaEndedDomain implements DomainEvent {
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaStartedDomain.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaStartedDomain.java
index ef599a00..a3c4df10 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/SagaStartedDomain.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.domain;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.domain;
import java.util.Calendar;
import java.util.Date;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/UpdateTxEventDomain.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/UpdateTxEventDomain.java
index a60a438d..c726988e 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/domain/UpdateTxEventDomain.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.domain;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.domain;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/AlphaMetricsEndpointImpl.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/AlphaMetricsEndpointImpl.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/AlphaMetricsEndpointImpl.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/AlphaMetricsEndpointImpl.java
index 18969804..91375e32 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/AlphaMetricsEndpointImpl.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/AlphaMetricsEndpointImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.metrics;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics;
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
import org.apache.servicecomb.pack.alpha.core.NodeStatus.TypeEnum;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/MetricsService.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/MetricsService.java
index aa085fc6..b2f26456 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/metrics/MetricsService.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/metrics/MetricsService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.metrics;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics;
import org.apache.servicecomb.pack.alpha.core.metrics.MetricsBean;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/SagaData.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/SagaData.java
index 3295d670..69c3bfdd 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/SagaData.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.model;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.model;
import java.io.Serializable;
import java.util.Date;
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
public class SagaData implements Serializable {
private String serviceName;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntities.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntities.java
index 8ba72fd9..04ce5615 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntities.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntities.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.model;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.model;
import java.util.ArrayList;
import java.util.LinkedHashMap;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntity.java
similarity index 98%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntity.java
index 8c2ea04a..79e833cc 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/model/TxEntity.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.model;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.model;
import java.io.Serializable;
import java.util.Date;
diff --git a/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelKafkaProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelKafkaProperties.java
new file mode 100644
index 00000000..d426ee19
--- /dev/null
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelKafkaProperties.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
+
+import java.util.Map;
+
+public class ChannelKafkaProperties {
+ private String topic;
+ private String bootstrapServers;
+ private int numPartitions;
+ private short replicationFactor;
+ private Map<String,String> consumer;
+ private Map<String,String> producer;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public short getReplicationFactor() {
+ return replicationFactor;
+ }
+
+ public void setReplicationFactor(short replicationFactor) {
+ this.replicationFactor = replicationFactor;
+ }
+
+ public Map<String, String> getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Map<String, String> consumer) {
+ this.consumer = consumer;
+ }
+
+ public Map<String, String> getProducer() {
+ return producer;
+ }
+
+ public void setProducer(Map<String, String> producer) {
+ this.producer = producer;
+ }
+}
\ No newline at end of file
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelMemoryProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelMemoryProperties.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelMemoryProperties.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelMemoryProperties.java
index 157e3488..a013a8ed 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelMemoryProperties.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelMemoryProperties.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.properties;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
public class ChannelMemoryProperties {
private int maxLength;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelProperties.java
similarity index 81%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelProperties.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelProperties.java
index 3afd6535..36e1a47b 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelProperties.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ChannelProperties.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.properties;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
public class ChannelProperties {
private String name;
private ChannelMemoryProperties memory;
+ private ChannelKafkaProperties kafka;
public String getName() {
return name;
@@ -36,4 +37,13 @@ public class ChannelProperties {
public void setMemory(ChannelMemoryProperties memory) {
this.memory = memory;
}
+
+ public ChannelKafkaProperties getKafka() {
+ return kafka;
+ }
+
+ public void setKafka(
+ ChannelKafkaProperties kafka) {
+ this.kafka = kafka;
+ }
}
\ No newline at end of file
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ElasticsearchProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ElasticsearchProperties.java
similarity index 95%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ElasticsearchProperties.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ElasticsearchProperties.java
index 317356f6..34996e1b 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ElasticsearchProperties.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/ElasticsearchProperties.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.properties;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
public class ElasticsearchProperties {
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/RepositoryProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/RepositoryProperties.java
similarity index 94%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/RepositoryProperties.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/RepositoryProperties.java
index 7b235a87..9283d9d5 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/RepositoryProperties.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/RepositoryProperties.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.properties;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
public class RepositoryProperties {
private String name;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/SpecSagaAkkaProperties.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/SpecSagaAkkaProperties.java
similarity index 95%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/SpecSagaAkkaProperties.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/SpecSagaAkkaProperties.java
index dc327e42..bee30ec0 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/SpecSagaAkkaProperties.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/properties/SpecSagaAkkaProperties.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.properties;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/AbstractTransactionRepositoryChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/AbstractTransactionRepositoryChannel.java
similarity index 90%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/AbstractTransactionRepositoryChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/AbstractTransactionRepositoryChannel.java
index 2b5eea34..d03a2952 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/AbstractTransactionRepositoryChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/AbstractTransactionRepositoryChannel.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
public abstract class AbstractTransactionRepositoryChannel implements TransactionRepositoryChannel {
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/NoneTransactionRepository.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/NoneTransactionRepository.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/NoneTransactionRepository.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/NoneTransactionRepository.java
index 900256e9..5da7d5aa 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/NoneTransactionRepository.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/NoneTransactionRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository;
import java.lang.invoke.MethodHandles;
import java.util.List;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepository.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepository.java
similarity index 95%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepository.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepository.java
index 825f1fc2..1704d481 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepository.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository;
import java.util.List;
import java.util.Map;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepositoryChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepositoryChannel.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepositoryChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepositoryChannel.java
index b5719115..e3d370a7 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/TransactionRepositoryChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/TransactionRepositoryChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/channel/DefaultTransactionRepositoryChannel.java
similarity index 72%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/channel/DefaultTransactionRepositoryChannel.java
index 25e31d83..596c5d48 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/channel/DefaultTransactionRepositoryChannel.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/channel/DefaultTransactionRepositoryChannel.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository.channel;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.channel;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.repository.AbstractTransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.AbstractTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
public class DefaultTransactionRepositoryChannel extends AbstractTransactionRepositoryChannel {
- public DefaultTransactionRepositoryChannel(TransactionRepository repository, MetricsService metricsService) {
+ public DefaultTransactionRepositoryChannel(
+ TransactionRepository repository, MetricsService metricsService) {
super(repository, metricsService);
}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/ElasticsearchTransactionRepository.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/ElasticsearchTransactionRepository.java
similarity index 96%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/ElasticsearchTransactionRepository.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/ElasticsearchTransactionRepository.java
index 1c9e2421..aebe703e 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/ElasticsearchTransactionRepository.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/ElasticsearchTransactionRepository.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -28,9 +28,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.PagingGlobalTransactions;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.properties.ElasticsearchProperties;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.ElasticsearchProperties;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/GlobalTransactionDocument.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/GlobalTransactionDocument.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/GlobalTransactionDocument.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/GlobalTransactionDocument.java
index dd23ef7c..efe8a0cd 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/repository/elasticsearch/GlobalTransactionDocument.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/repository/elasticsearch/GlobalTransactionDocument.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.elasticsearch;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
import org.springframework.data.elasticsearch.annotations.Document;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaClusterListener.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaClusterListener.java
index 418cd2e3..7bfff881 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaClusterListener.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaClusterListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka;
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaConfigPropertyAdapter.java
similarity index 98%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaConfigPropertyAdapter.java
index 31c12a57..909d2042 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/AkkaConfigPropertyAdapter.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/AkkaConfigPropertyAdapter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SagaDataExtension.java
similarity index 89%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SagaDataExtension.java
index b91c27ad..c7676dd5 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SagaDataExtension.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka;
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
@@ -27,11 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.pack.alpha.core.fsm.TransactionType;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.SagaSubTransaction;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension.SagaDataExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SpringAkkaExtension.java
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SpringAkkaExtension.java
index 6892a20f..019f5c5b 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SpringAkkaExtension.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/spring/integration/akka/SpringAkkaExtension.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka;
import static org.apache.servicecomb.pack.common.EventType.TxCompensateEvent;
@@ -29,8 +29,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.TxEvent;
-import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SpringExt;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SpringAkkaExtension.SpringExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/test/FsmSagaDataController.java b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/test/FsmSagaDataController.java
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/test/FsmSagaDataController.java
rename to alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/test/FsmSagaDataController.java
index a430355c..a0e98f75 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/test/FsmSagaDataController.java
+++ b/alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/test/FsmSagaDataController.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.test;
+package org.apache.servicecomb.pack.alpha.spec.saga.akka.test;
import akka.actor.ActorSystem;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -27,8 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.servicecomb.pack.alpha.core.TxEvent;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/alpha/alpha-spec-saga-fsm/src/main/resources/META-INF/spring.factories b/alpha/alpha-spec-saga-akka/src/main/resources/META-INF/spring.factories
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/main/resources/META-INF/spring.factories
rename to alpha/alpha-spec-saga-akka/src/main/resources/META-INF/spring.factories
index e43b6111..6763e655 100644
--- a/alpha/alpha-spec-saga-fsm/src/main/resources/META-INF/spring.factories
+++ b/alpha/alpha-spec-saga-akka/src/main/resources/META-INF/spring.factories
@@ -15,4 +15,4 @@
## limitations under the License.
## ---------------------------------------------------------------------------
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
- org.apache.servicecomb.pack.alpha.fsm.AlphaSpecSagaFsmAutoConfiguration
+ org.apache.servicecomb.pack.alpha.spec.saga.akka.AlphaSpecSagaAkkaAutoConfiguration
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaApplication.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaApplication.java
similarity index 100%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaApplication.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaApplication.java
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java
index 3d53fc00..11479dfe 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/it/SagaIntegrationTest.java
@@ -27,12 +27,12 @@ import static org.mockito.Mockito.when;
import akka.actor.ActorSystem;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.util.SagaEventSender;
-import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.memory.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java
index 76745f92..55edace1 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/SagaActorTest.java
@@ -17,7 +17,7 @@
package org.apache.servicecomb.pack.alpha.spec.saga.akka.ut;
-import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
+import static org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -38,17 +38,17 @@ import java.util.Map;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActor;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.util.SagaEventSender;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
-import org.apache.servicecomb.pack.alpha.fsm.properties.ElasticsearchProperties;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel;
-import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.ElasticsearchProperties;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.channel.DefaultTransactionRepositoryChannel;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java
similarity index 97%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java
index 887f4877..ff589cfe 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/APIv1ControllerTest.java
@@ -47,10 +47,10 @@ import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.PagingGlobalT
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.SagaSubTransaction;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetricsEndpoint;
import org.apache.servicecomb.pack.alpha.core.metrics.MetricsBean;
-import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Controller;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.api.SagaAkkaAPIv1Controller;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,7 +64,7 @@ import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {TestConfiguration.class})
-@WebMvcTest(APIv1Controller.class)
+@WebMvcTest(SagaAkkaAPIv1Controller.class)
public class APIv1ControllerTest {
@Autowired
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java
similarity index 73%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java
index 1d5dc6b1..1f2b75d3 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/api/TestConfiguration.java
@@ -19,11 +19,11 @@ package org.apache.servicecomb.pack.alpha.spec.saga.akka.ut.api;
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetricsEndpoint;
-import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Controller;
-import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Impl;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.AlphaMetricsEndpointImpl;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.api.SagaAkkaAPIv1Controller;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.api.SagaAkkaAPIv1Impl;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.AlphaMetricsEndpointImpl;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -49,12 +49,12 @@ public class TestConfiguration {
}
@Bean
- APIv1Controller apIv1Controller(){
- return new APIv1Controller();
+ SagaAkkaAPIv1Controller apIv1Controller(){
+ return new SagaAkkaAPIv1Controller();
}
@Bean
- APIv1Impl apIv1(){
- return new APIv1Impl();
+ SagaAkkaAPIv1Impl apIv1(){
+ return new SagaAkkaAPIv1Impl();
}
}
\ No newline at end of file
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java
similarity index 93%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java
index c647dd6d..bcd80595 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/model/TxEntitiesTest.java
@@ -26,8 +26,8 @@ import java.util.Date;
import java.util.List;
import java.util.ListIterator;
import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.model.TxEntities;
-import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.TxEntities;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.TxEntity;
import org.junit.Test;
public class TxEntitiesTest {
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java
similarity index 92%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java
index 4a99444c..0161b17d 100644
--- a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java
+++ b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/ut/repository/ElasticsearchTransactionRepositoryTest.java
@@ -28,11 +28,11 @@ import java.util.Date;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.core.fsm.TransactionType;
import org.apache.servicecomb.pack.alpha.core.fsm.repository.model.GlobalTransaction;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.apache.servicecomb.pack.alpha.fsm.properties.ElasticsearchProperties;
-import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
-import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.SagaActorState;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.ElasticsearchProperties;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.TransactionRepository;
+import org.apache.servicecomb.pack.alpha.spec.saga.akka.repository.elasticsearch.ElasticsearchTransactionRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/util/SagaEventSender.java b/alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/util/SagaEventSender.java
similarity index 100%
rename from alpha/alpha-spec-saga-fsm/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/util/SagaEventSender.java
rename to alpha/alpha-spec-saga-akka/src/test/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/util/SagaEventSender.java
diff --git a/alpha/alpha-spec-saga-fsm/src/test/resources/application.yaml b/alpha/alpha-spec-saga-akka/src/test/resources/application.yaml
similarity index 100%
rename from alpha/alpha-spec-saga-fsm/src/test/resources/application.yaml
rename to alpha/alpha-spec-saga-akka/src/test/resources/application.yaml
diff --git a/alpha/alpha-spec-saga-fsm/src/test/resources/log4j2.xml b/alpha/alpha-spec-saga-akka/src/test/resources/log4j2.xml
similarity index 100%
rename from alpha/alpha-spec-saga-fsm/src/test/resources/log4j2.xml
rename to alpha/alpha-spec-saga-akka/src/test/resources/log4j2.xml
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java b/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java
deleted file mode 100644
index 6e3cfe7d..00000000
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaActorEventChannel.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
-
-import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-
-public class KafkaActorEventChannel extends AbstractActorEventChannel {
-
- private KafkaMessagePublisher kafkaMessagePublisher;
-
- public KafkaActorEventChannel(MetricsService metricsService, KafkaMessagePublisher kafkaMessagePublisher) {
- super(metricsService);
- this.kafkaMessagePublisher = kafkaMessagePublisher;
- }
-
- @Override
- public void sendTo(BaseEvent event){
- kafkaMessagePublisher.publish(event);
- }
-}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
deleted file mode 100644
index a2a32b4a..00000000
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import com.google.common.collect.Maps;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.PostConstruct;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
-import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-@Configuration
-@ConditionalOnClass(KafkaProperties.class)
-@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
-public class KafkaChannelAutoConfiguration {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
- private String topic;
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrap_servers;
-
- @Value("${spring.kafka.consumer.group-id:servicecomb-pack}")
- private String groupId;
-
- @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal")
- private String trusted_packages;
-
- @Value("${spring.kafka.producer.batch-size:16384}")
- private int batchSize;
-
- @Value("${spring.kafka.producer.retries:0}")
- private int retries;
-
- @Value("${spring.kafka.producer.buffer.memory:33554432}")
- private long bufferMemory;
-
- @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")
- private String autoOffsetReset;
-
- @Value("${spring.kafka.consumer.enable.auto.commit:false}")
- private boolean enableAutoCommit;
-
- @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
- private int autoCommitIntervalMs;
-
- @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}")
- private String ackMode;
-
- @Value("${spring.kafka.listener.pollTimeout:1500}")
- private long poolTimeout;
-
- @Value("${kafka.numPartitions:6}")
- private int numPartitions;
-
- @Value("${kafka.replicationFactor:1}")
- private short replicationFactor;
-
- @PostConstruct
- public void init() {
- Map props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
- try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
- try {
- final NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
- final CreateTopicsResult createTopicsResult = adminClient
- .createTopics(Collections.singleton(newTopic));
- createTopicsResult.values().get(topic).get();
- } catch (InterruptedException | ExecutionException e) {
- if (e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- if (!(e.getCause() instanceof TopicExistsException)) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
- }
- LOG.info("Kafka Channel Init");
- }
-
- @Bean
- @ConditionalOnMissingBean
- public KafkaMessagePublisher kafkaMessagePublisher() {
- Map<String, Object> map = Maps.newHashMap();
- map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
- map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
- map.put(ProducerConfig.RETRIES_CONFIG, retries);
- map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
- map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
- return new KafkaMessagePublisher(topic,
- new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(map)));
- }
-
- @Bean
- @ConditionalOnMissingBean(ActorEventChannel.class)
- public ActorEventChannel kafkaEventChannel(MetricsService metricsService,
- @Lazy KafkaMessagePublisher kafkaMessagePublisher) {
- return new KafkaActorEventChannel(metricsService, kafkaMessagePublisher);
- }
-
- @Bean
- KafkaSagaEventConsumer sagaEventKafkaConsumer(ActorSystem actorSystem,
- @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
- MetricsService metricsService) {
- return new KafkaSagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
- bootstrap_servers, topic);
- }
-}
diff --git a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelKafkaProperties.java b/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelKafkaProperties.java
deleted file mode 100644
index 2f76334b..00000000
--- a/alpha/alpha-spec-saga-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/properties/ChannelKafkaProperties.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.pack.alpha.fsm.properties;
-
-public class ChannelKafkaProperties {
-
-}
\ No newline at end of file
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 7f13aeff..1f37b85d 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -34,7 +34,7 @@
<modules>
<module>alpha-core</module>
<module>alpha-persistence-jpa</module>
- <module>alpha-spec-saga-fsm</module>
+ <module>alpha-spec-saga-akka</module>
<module>alpha-spec-saga-db</module>
<module>alpha-spec-tcc-db</module>
<module>alpha-benchmark</module>
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSagaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSagaSpringConfig.java
new file mode 100644
index 00000000..7d0b0ac0
--- /dev/null
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/pack/omega/spring/OmegaSagaSpringConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.spring;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.servicecomb.pack.common.AlphaMetaKeys;
+import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
+import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterDiscovery;
+import org.apache.servicecomb.pack.omega.connector.grpc.AlphaClusterConfig;
+import org.apache.servicecomb.pack.omega.connector.grpc.core.FastestSender;
+import org.apache.servicecomb.pack.omega.connector.grpc.core.LoadBalanceContext;
+import org.apache.servicecomb.pack.omega.connector.grpc.core.LoadBalanceContextBuilder;
+import org.apache.servicecomb.pack.omega.connector.grpc.core.TransactionType;
+import org.apache.servicecomb.pack.omega.connector.grpc.saga.SagaLoadBalanceSender;
+import org.apache.servicecomb.pack.omega.transaction.CallbackContext;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.ServiceConfig;
+import org.apache.servicecomb.pack.omega.transaction.SagaMessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.lang.invoke.MethodHandles;
+
+@Configuration
+@ConditionalOnExpression("'${omega.spec.names}'.contains('saga')")
+class OmegaSagaSpringConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public OmegaSagaSpringConfig() {
+ LOG.info("Omega Specification Saga");
+ }
+
+ @Bean(name = {"compensationContext"})
+ CallbackContext compensationContext(OmegaContext omegaContext, SagaMessageSender sender) {
+ return new CallbackContext(omegaContext, sender);
+ }
+
+ @Bean(name = "sagaLoadContext")
+ LoadBalanceContext sagaLoadBalanceSenderContext(
+ AlphaClusterConfig alphaClusterConfig,
+ ServiceConfig serviceConfig,
+ @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
+ @Value("${omega.connection.sending.timeout:8}") int timeoutSeconds) {
+ LoadBalanceContext loadBalanceSenderContext = new LoadBalanceContextBuilder(
+ TransactionType.SAGA,
+ alphaClusterConfig,
+ serviceConfig,
+ reconnectDelay,
+ timeoutSeconds).build();
+ return loadBalanceSenderContext;
+ }
+
+ @Bean
+ SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
+ final SagaMessageSender sagaMessageSender = new SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
+ sagaMessageSender.onConnected();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ sagaMessageSender.onDisconnected();
+ sagaMessageSender.close();
+ }
+ }));
+ return sagaMessageSender;
+ }
+}
diff --git a/pack-dependencies/pom.xml b/pack-dependencies/pom.xml
index 85848772..0222fec2 100644
--- a/pack-dependencies/pom.xml
+++ b/pack-dependencies/pom.xml
@@ -279,7 +279,7 @@
</dependency>
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
- <artifactId>alpha-spec-saga-fsm</artifactId>
+ <artifactId>alpha-spec-saga-akka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>