You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/27 05:49:46 UTC
[servicecomb-pack] branch SCB-1321 updated: SCB-1321 Add Saga Event
Message Bus
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
The following commit(s) were added to refs/heads/SCB-1321 by this push:
new 281e34c SCB-1321 Add Saga Event Message Bus
281e34c is described below
commit 281e34c10fcdfca66e6b659c95b2296d8a2573ce
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Thu Jun 27 13:49:02 2019 +0800
SCB-1321 Add Saga Event Message Bus
---
alpha/alpha-fsm/pom.xml | 39 ++-
.../pack/alpha/fsm/FsmAutoConfiguration.java | 60 ++++
.../servicecomb/pack/alpha/fsm/SagaActor.java | 17 +-
.../fsm/event/consumer/SagaEventConsumer.java | 81 ++++++
.../servicecomb/pack/alpha/fsm/model/SagaData.java | 10 +
.../fsm/spring/integration/akka/LogExtension.java | 31 +++
.../spring/integration/akka/LogExtensionImpl.java | 35 +++
.../eventbus/EventSubscribeBeanPostProcessor.java | 62 +++++
.../src/main/resources/META-INF/spring.factories | 17 ++
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 165 +++--------
.../pack/alpha/fsm/SagaApplication.java | 28 ++
.../pack/alpha/fsm/SagaEventSender.java | 297 ++++++++++++++++++++
.../pack/alpha/fsm/SagaIntegrationTest.java | 308 +++++++++++++++++++++
13 files changed, 1014 insertions(+), 136 deletions(-)
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index faa4e05..b1897d9 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -36,6 +36,13 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring.boot.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.12</artifactId>
<version>${akka.version}</version>
@@ -44,6 +51,12 @@
</dependencyManagement>
<dependencies>
+ <!-- spring boot -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-autoconfigure</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
<artifactId>pack-common</artifactId>
@@ -52,21 +65,19 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
-
- <!-- log -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>test</scope>
- </dependency>
+<!-- <dependency>-->
+<!-- <groupId>org.apache.logging.log4j</groupId>-->
+<!-- <artifactId>log4j-slf4j-impl</artifactId>-->
+<!-- <scope>test</scope>-->
+<!-- </dependency>-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
@@ -95,6 +106,16 @@
<!-- For testing the artifacts scope are test-->
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
new file mode 100644
index 0000000..252d6ae
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.actor.ActorSystem;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = {"alpha.model.actor.enabled"})
+public class FsmAutoConfiguration {
+
+ @Bean
+ public ActorSystem actorSystem() {
+ ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration());
+ return system;
+ }
+
+ @Bean
+ public Config akkaConfiguration() {
+ return ConfigFactory.load();
+ }
+
+ @Bean(name = "sagaEventBus")
+ public EventBus sagaEventBus() {
+ return new EventBus();
+ }
+
+ @Bean
+ public SagaEventConsumer sagaEventConsumer(){
+ return new SagaEventConsumer();
+ }
+
+ @Bean
+ public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+ return new EventSubscribeBeanPostProcessor();
+ }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 613a23b..dca006e 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -37,6 +37,7 @@ import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
@@ -46,6 +47,7 @@ public class SagaActor extends
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public static Props props(String persistenceId) {
return Props.create(SagaActor.class, persistenceId);
}
@@ -218,7 +220,7 @@ public class SagaActor extends
if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
|| hasCommittedTx(data)) {
return stay();
- }else{
+ } else {
return goTo(SagaActorState.COMPENSATED)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
@@ -280,6 +282,17 @@ public class SagaActor extends
})
);
+ onTermination(
+ matchStop(
+ Normal(), (state, data) -> {
+ LOG.info("stop {} {}", data.getGlobalTxId(), state);
+ data.setTerminated(true);
+ data.setLastState(state);
+ LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data);
+ }
+ )
+ );
+
}
@Override
@@ -337,7 +350,7 @@ public class SagaActor extends
// decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(TxState.COMPENSATED);
- LOG.info("compensation is completed {}",txEntity.getLocalTxId());
+ LOG.info("compensation is completed {}", txEntity.getLocalTxId());
}
}
} else if (event instanceof SagaEvent) {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
new file mode 100644
index 0000000..aae9419
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class SagaEventConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final Timeout TIMEOUT = new Timeout(1000, TimeUnit.MILLISECONDS);
+
+ @Autowired
+ ActorSystem system;
+
+ /**
+ * Receive saga message
+ * */
+ @Subscribe
+ public void receiveSagaEvent(BaseEvent event) throws Exception {
+ LOG.info("receive {} ", event.toString());
+ try{
+ //TODO Write-Ahead Logging
+ ActorRef saga;
+ String actorPath = "/user/" + event.getGlobalTxId();
+ Optional<ActorRef> optional = this.getActorRefFromPath(actorPath);
+ if (!optional.isPresent()) {
+ saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+ } else {
+ saga = optional.get();
+ }
+ saga.tell(event, ActorRef.noSender());
+ LOG.info("tell {} to {}", event.toString(),saga);
+ //TODO WAL commit
+ }catch (Exception ex){
+ //TODO
+ throw ex;
+ }
+ }
+
+ public Optional<ActorRef> getActorRefFromPath(String path) throws Exception {
+ try {
+ ActorSelection selection = system.actorSelection(path);
+ Future<ActorRef> future = selection.resolveOne(TIMEOUT);
+ ActorRef ref = Await.result(future, TIMEOUT.duration());
+ return Optional.of(ref);
+ } catch (ActorNotFound e) {
+ return Optional.absent();
+ }
+ }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 3c794fa..4908029 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
public class SagaData implements Serializable {
private long beginTime = System.currentTimeMillis();
@@ -28,6 +29,7 @@ public class SagaData implements Serializable {
private String globalTxId;
private long expirationTime;
private boolean terminated;
+ private SagaActorState lastState;
private AtomicLong compensationRunningCounter = new AtomicLong();
private Map<String,TxEntity> txEntityMap = new HashMap<>();
@@ -89,6 +91,14 @@ public class SagaData implements Serializable {
this.txEntityMap = txEntityMap;
}
+ public SagaActorState getLastState() {
+ return lastState;
+ }
+
+ public void setLastState(SagaActorState lastState) {
+ this.lastState = lastState;
+ }
+
public long getTimeout(){
return expirationTime-System.currentTimeMillis();
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
new file mode 100644
index 0000000..9c4d27d
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtension.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import akka.actor.AbstractExtensionId;
+import akka.actor.ExtendedActorSystem;
+
+public class LogExtension extends AbstractExtensionId<LogExtensionImpl> {
+
+ public static final LogExtension LogExtensionProvider = new LogExtension();
+
+ @Override
+ public LogExtensionImpl createExtension(ExtendedActorSystem system) {
+ return new LogExtensionImpl();
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
new file mode 100644
index 0000000..4bf00b4
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/LogExtensionImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka;
+
+import akka.actor.Extension;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+
+public class LogExtensionImpl implements Extension {
+ private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap();
+
+ public void putSagaData(String globalTxId, SagaData sagaData){
+ sagaDataMap.put(globalTxId, sagaData);
+ }
+
+ public SagaData getSagaData(String globalTxId){
+ return sagaDataMap.get(globalTxId);
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java
new file mode 100644
index 0000000..11f2cb0
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/eventbus/EventSubscribeBeanPostProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+public class EventSubscribeBeanPostProcessor implements BeanPostProcessor {
+
+ @Autowired(required = false)
+ @Qualifier("sagaEventBus")
+ EventBus eventBus;
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName)
+ throws BeansException {
+ return bean;
+ }
+
+ /**
+ * If the spring bean's method defines @Subscribe, then register the spring bean into the Guava
+ * Event
+ */
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName)
+ throws BeansException {
+ if(eventBus !=null){
+ Method[] methods = bean.getClass().getMethods();
+ for (Method method : methods) {
+ Annotation[] annotations = method.getAnnotations();
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(Subscribe.class)) {
+ eventBus.register(bean);
+ return bean;
+ }
+ }
+ }
+ }
+ return bean;
+ }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..ff42cdd
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.FsmAutoConfiguration
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 21c9402..5644c1b 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -25,28 +25,15 @@ import akka.actor.Terminated;
import akka.persistence.fsm.PersistentFSM;
import akka.persistence.fsm.PersistentFSM.CurrentState;
import akka.testkit.javadsl.TestKit;
-import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SagaActorTest {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
static ActorSystem system;
@BeforeClass
@@ -85,14 +72,10 @@ public class SagaActorTest {
ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -152,10 +135,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -207,13 +189,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -275,16 +253,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -353,16 +324,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -375,26 +339,14 @@ public class SagaActorTest {
assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
transition = expectMsgClass(PersistentFSM.Transition.class);
- assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
-
- transition = expectMsgClass(PersistentFSM.Transition.class);
- assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
-
- transition = expectMsgClass(PersistentFSM.Transition.class);
- assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
-
- transition = expectMsgClass(PersistentFSM.Transition.class);
- assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
-
- transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
SagaData sagaData = expectMsgClass(SagaData.class);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
transition = expectMsgClass(PersistentFSM.Transition.class);
@@ -433,17 +385,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -514,14 +458,9 @@ public class SagaActorTest {
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(SagaTimeoutEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -573,10 +512,9 @@ public class SagaActorTest {
* 5. TxEndedEvent-12
* 4. TxStartedEvent-13
* 5. TxEndedEvent-13
- * 6. SagaEndedEvent-1
*/
@Test
- public void sagaActorTriggerTimeoutTest() throws InterruptedException {
+ public void sagaActorTriggerTimeoutTest() {
new TestKit(system) {{
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
@@ -587,15 +525,10 @@ public class SagaActorTest {
ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- Thread.sleep(timeout*2000);
- saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -644,7 +577,7 @@ public class SagaActorTest {
* 8. SagaEndedEvent-1
*/
@Test
- public void successfulTestWithTxConcurrent() throws InterruptedException {
+ public void successfulWithTxConcurrentTest() {
new TestKit(system) {{
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
@@ -654,14 +587,9 @@ public class SagaActorTest {
ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -704,7 +632,7 @@ public class SagaActorTest {
* 8. SagaEndedEvent-1
*/
@Test
- public void successfulTestWithTxConcurrentCross() throws InterruptedException {
+ public void successfulWithTxConcurrentCrossTest() throws InterruptedException {
new TestKit(system) {{
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
@@ -714,14 +642,9 @@ public class SagaActorTest {
ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
@@ -782,17 +705,9 @@ public class SagaActorTest {
ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
-
- saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
- saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
- saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
//expect
CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java
new file mode 100644
index 0000000..7c14000
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaApplication.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SagaApplication {
+ public static void main(String[] args){
+ SpringApplication.run(SagaApplication.class, args);
+ }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
new file mode 100644
index 0000000..a3ddd0d
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+/**
+ * Event simulator
+ * */
+
+public class SagaEventSender {
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 6. SagaEndedEvent-1
+ */
+ public static List<BaseEvent> successfulEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxAbortedEvent-11
+ * 7. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> firstTxAbortedEvents(String globalTxId, String localTxId_1){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxAbortedEvent-12
+ * 6. TxComponsitedEvent-11
+ * 7. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxAbortedEvent-13
+ * 8. TxComponsitedEvent-11
+ * 9. TxComponsitedEvent-12
+ * 10. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxAbortedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxEndedEvent-13
+ * 8. TxComponsitedEvent-12
+ * 9. TxComponsitedEvent-13
+ * 10. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxEndedEvent-13
+ * 8. SagaAbortedEvent-1
+ * 9. TxComponsitedEvent-11
+ * 8. TxComponsitedEvent-12
+ * 9. TxComponsitedEvent-13
+ */
+ public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 5. SagaTimeoutEvent-1
+ */
+ public static List<BaseEvent> omegaSendSagaTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaTimeoutEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent(5s)-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ */
+ public static List<BaseEvent> sagaActorTriggerTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3, int timeout){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxStartedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-11
+ * 6. TxEndedEvent-12
+ * 7. TxEndedEvent-13
+ * 8. SagaEndedEvent-1
+ */
+ public static List<BaseEvent> successfulWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxStartedEvent-12
+ * 5. TxEndedEvent-11
+ * 4. TxStartedEvent-13
+ * 6. TxEndedEvent-12
+ * 7. TxEndedEvent-13
+ * 8. SagaEndedEvent-1
+ */
+ public static List<BaseEvent> successfulWithTxConcurrentCrossEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 4. TxStartedEvent-12
+ * 6. TxStartedEvent-13
+ * 3. TxEndedEvent-11
+ * 5. TxEndedEvent-12
+ * 7. TxAbortedEvent-13
+ * 8. TxComponsitedEvent-11
+ * 9. TxComponsitedEvent-12
+ * 10. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
new file mode 100644
index 0000000..c13c6af
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.ActorSystem;
+import com.google.common.eventbus.EventBus;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {SagaApplication.class},
+ properties = {
+ "alpha.model.actor.enabled=true"
+ })
+public class SagaIntegrationTest {
+
+ @Autowired
+ @Qualifier("sagaEventBus")
+ EventBus sagaEventBus;
+
+ @Autowired
+ ActorSystem system;
+
+ @Test
+ public void successfulTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void firstTxAbortedEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 1
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void middleTxAbortedEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 2
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void lastTxAbortedEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void sagaAbortedEventAfterAllTxEndedTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void omegaSendSagaTimeoutEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.SUSPENDED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void sagaActorTriggerTimeoutTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ final int timeout = 5; // second
+ SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(timeout+1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.SUSPENDED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void successfulWithTxConcurrentTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void successfulWithTxConcurrentCrossTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void lastTxAbortedEventWithTxConcurrentTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+}