You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/21 10:34:17 UTC
[servicecomb-pack] 01/05: SCB-1321 Add Alpha FSM prototype code
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 98bc27bba72437e10bdc74d850559058f6441d7a
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Fri Jun 21 17:07:56 2019 +0800
SCB-1321 Add Alpha FSM prototype code
---
alpha/alpha-fsm/pom.xml | 123 ++++
.../servicecomb/pack/alpha/fsm/SagaActor.java | 331 +++++++++++
.../servicecomb/pack/alpha/fsm/SagaActorState.java | 19 +
.../apache/servicecomb/pack/alpha/fsm/TxState.java | 8 +
.../pack/alpha/fsm/event/SagaAbortedEvent.java | 28 +
.../pack/alpha/fsm/event/SagaDomainEvent.java | 14 +
.../pack/alpha/fsm/event/SagaEndedEvent.java | 28 +
.../pack/alpha/fsm/event/SagaStartedEvent.java | 43 ++
.../pack/alpha/fsm/event/SagaTimeoutEvent.java | 27 +
.../pack/alpha/fsm/event/TxAbortedEvent.java | 39 ++
.../pack/alpha/fsm/event/TxComponsitedEvent.java | 38 ++
.../pack/alpha/fsm/event/TxEndedEvent.java | 39 ++
.../pack/alpha/fsm/event/TxStartedEvent.java | 38 ++
.../pack/alpha/fsm/event/base/BaseEvent.java | 26 +
.../pack/alpha/fsm/event/base/SagaEvent.java | 14 +
.../pack/alpha/fsm/event/base/TxEvent.java | 31 ++
.../servicecomb/pack/alpha/fsm/model/SagaData.java | 131 +++++
.../servicecomb/pack/alpha/fsm/model/TxEntity.java | 95 ++++
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 619 +++++++++++++++++++++
.../alpha-fsm/src/test/resources/application.conf | 3 +
alpha/alpha-fsm/src/test/resources/log4j2.xml | 30 +
21 files changed, 1724 insertions(+)
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
new file mode 100644
index 0000000..faa4e05
--- /dev/null
+++ b/alpha/alpha-fsm/pom.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>alpha</artifactId>
+ <groupId>org.apache.servicecomb.pack</groupId>
+ <version>0.5.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>alpha-fsm</artifactId>
+ <name>Pack::Alpha::Fsm</name>
+
+ <properties>
+ <leveldbjni-all.version>1.8</leveldbjni-all.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence_2.12</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.servicecomb.pack</groupId>
+ <artifactId>pack-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <!-- log -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.persistence</groupId>
+ <artifactId>javax.persistence-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- akka -->
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>${leveldbjni-all.version}</version>
+ </dependency>
+
+ <!-- For testing the artifacts scope are test-->
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.seanyinx</groupId>
+ <artifactId>unit-scaffolding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_2.12</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
new file mode 100644
index 0000000..492d187
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -0,0 +1,331 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+
+import akka.actor.Props;
+import akka.persistence.fsm.AbstractPersistentFSM;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+public class SagaActor extends
+ AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static Props props(String persistenceId) {
+ return Props.create(SagaActor.class, persistenceId);
+ }
+
+ private final String persistenceId;
+
+ public SagaActor(String persistenceId) {
+ this.persistenceId = persistenceId;
+
+ startWith(SagaActorState.IDEL, SagaData.builder().build());
+
+ when(SagaActorState.IDEL,
+ matchEvent(SagaStartedEvent.class,
+ (event, data) -> {
+ data.setGlobalTxId(event.getGlobalTxId());
+ data.setBeginTime(System.currentTimeMillis());
+ if (event.getTimeout() > 0) {
+ data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000);
+ return goTo(SagaActorState.READY)
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return goTo(SagaActorState.READY);
+ }
+ }
+
+ )
+ );
+
+ when(SagaActorState.READY,
+ matchEvent(TxStartedEvent.class, SagaData.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ if (data.getExpirationTime() > 0) {
+ return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return goTo(SagaActorState.PARTIALLY_ACTIVE);
+ }
+ }
+ ).event(SagaEndedEvent.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED).replying(data);
+ }
+ ).event(SagaAbortedEvent.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED).replying(data);
+ }
+ ).event(Arrays.asList(StateTimeout()), SagaData.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED)
+ .replying(data);
+ })
+ );
+
+ when(SagaActorState.PARTIALLY_ACTIVE,
+ matchEvent(TxEndedEvent.class, SagaData.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ if (data.getExpirationTime() > 0) {
+ return goTo(SagaActorState.PARTIALLY_COMMITTED)
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return goTo(SagaActorState.PARTIALLY_COMMITTED);
+ }
+ }
+ ).event(SagaTimeoutEvent.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ ).event(TxAbortedEvent.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ return goTo(SagaActorState.FAILED);
+ }
+ ).event(Arrays.asList(StateTimeout()), SagaData.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED).replying(data);
+ })
+ );
+
+ when(SagaActorState.PARTIALLY_COMMITTED,
+ matchEvent(TxStartedEvent.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ if (data.getExpirationTime() > 0) {
+ return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return goTo(SagaActorState.PARTIALLY_ACTIVE);
+ }
+ }
+ ).event(SagaTimeoutEvent.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ ).event(SagaEndedEvent.class,
+ (event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ return goTo(SagaActorState.COMMITTED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ ).event(SagaAbortedEvent.class,
+ (event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ updateTxEntity(event, data);
+ return goTo(SagaActorState.FAILED);
+ }
+ ).event(TxAbortedEvent.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ return goTo(SagaActorState.FAILED);
+ }
+ ).event(Arrays.asList(StateTimeout()), SagaData.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED).replying(data);
+ })
+ );
+
+ when(SagaActorState.FAILED,
+ matchEvent(SagaTimeoutEvent.class, SagaData.class,
+ (event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ return goTo(SagaActorState.SUSPENDED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ ).event(TxComponsitedEvent.class, SagaData.class,
+ (event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ updateTxEntity(event, data);
+ if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
+ || hasCommittedTx(data)) {
+ return stay();
+ } else {
+ return goTo(SagaActorState.COMPENSATED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ }
+ ).event(SagaAbortedEvent.class, SagaData.class,
+ (event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ updateTxEntity(event, data);
+ data.setTerminated(true);
+ if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
+ || hasCommittedTx(data)) {
+ return stay();
+ }else{
+ return goTo(SagaActorState.COMPENSATED)
+ .replying(data)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
+ }
+ }
+ ).event(TxStartedEvent.class, SagaData.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ return stay();
+ }
+ ).event(TxEndedEvent.class, SagaData.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ // TODO 调用补偿方法
+ TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
+ compensation(txEntity, data);
+ return stay();
+ }
+ ).event(Arrays.asList(StateTimeout()), SagaData.class,
+ (event, data) -> {
+ return goTo(SagaActorState.SUSPENDED).replying(data);
+ })
+ );
+
+ when(SagaActorState.COMMITTED,
+ matchAnyEvent(
+ (event, data) -> {
+ return stop();
+ }
+ )
+ );
+
+ when(SagaActorState.SUSPENDED,
+ matchAnyEvent(
+ (event, data) -> {
+ return stop();
+ }
+ )
+ );
+
+ when(SagaActorState.COMPENSATED,
+ matchAnyEvent(
+ (event, data) -> {
+ return stop();
+ }
+ )
+ );
+
+ whenUnhandled(
+ matchAnyEvent((event, data) -> {
+ LOG.error("unmatch event {}", event);
+ return stay();
+ })
+ );
+
+ onTransition(
+ matchState(null, null, (from, to) -> {
+ LOG.info("transition {} {} -> {}", getSelf(), from, to);
+ })
+ );
+
+ }
+
+ @Override
+ public void onRecoveryCompleted() {
+ LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData());
+ }
+
+ @Override
+ public Class domainEventClass() {
+ return SagaDomainEvent.DomainEvent.class;
+ }
+
+
+ @Override
+ public String persistenceId() {
+ return persistenceId;
+ }
+
+ @Override
+ public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) {
+ return currentData;
+ }
+
+ private void updateTxEntity(BaseEvent event, SagaData data) {
+ if (event instanceof TxEvent) {
+ TxEvent txEvent = (TxEvent) event;
+ if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) {
+ if (event instanceof TxStartedEvent) {
+ TxEntity txEntity = TxEntity.builder()
+ .localTxId(txEvent.getLocalTxId())
+ .parentTxId(txEvent.getParentTxId())
+ .state(TxState.ACTIVE)
+ .build();
+ data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
+ }
+ } else {
+ TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId());
+ if (event instanceof TxEndedEvent) {
+ if (txEntity.getState() == TxState.ACTIVE) {
+ txEntity.setEndTime(System.currentTimeMillis());
+ txEntity.setState(TxState.COMMITTED);
+ }
+ } else if (event instanceof TxAbortedEvent) {
+ if (txEntity.getState() == TxState.ACTIVE) {
+ txEntity.setEndTime(System.currentTimeMillis());
+ txEntity.setState(TxState.FAILED);
+ // TODO 调用补偿方法
+ data.getTxEntityMap().forEach((k, v) -> {
+ if (v.getState() == TxState.COMMITTED) {
+ // TODO 调用补偿方法
+ compensation(v, data);
+ }
+ });
+ }
+ } else if (event instanceof TxComponsitedEvent) {
+ //补偿中计数器减一
+ data.getCompensationRunningCounter().decrementAndGet();
+ txEntity.setState(TxState.COMPENSATED);
+ LOG.info("完成补偿 {}",txEntity.getLocalTxId());
+ }
+ }
+ } else if (event instanceof SagaEvent) {
+ if (event instanceof SagaAbortedEvent) {
+ data.getTxEntityMap().forEach((k, v) -> {
+ if (v.getState() == TxState.COMMITTED) {
+ // TODO 调用补偿方法
+ compensation(v, data);
+ }
+ });
+ }
+ }
+ }
+
+ private boolean hasCommittedTx(SagaData data) {
+ return data.getTxEntityMap().entrySet().stream()
+ .filter(map -> map.getValue().getState() == TxState.COMMITTED)
+ .count() > 0;
+ }
+
+ private void compensation(TxEntity txEntity, SagaData data) {
+ //补偿中计数器加一
+ data.getCompensationRunningCounter().incrementAndGet();
+ LOG.info("调用补偿方法 {}", txEntity.getLocalTxId());
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
new file mode 100644
index 0000000..0c7be27
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java
@@ -0,0 +1,19 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import akka.persistence.fsm.PersistentFSM;
+
+public enum SagaActorState implements PersistentFSM.FSMState {
+ IDEL,
+ READY,
+ PARTIALLY_ACTIVE,
+ PARTIALLY_COMMITTED,
+ FAILED,
+ COMMITTED,
+ COMPENSATED,
+ SUSPENDED;
+
+ @Override
+ public String identifier() {
+ return name();
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
new file mode 100644
index 0000000..e9c54e4
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -0,0 +1,8 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+public enum TxState {
+ ACTIVE,
+ FAILED,
+ COMMITTED,
+ COMPENSATED;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java
new file mode 100644
index 0000000..351146b
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java
@@ -0,0 +1,28 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaAbortedEvent extends SagaEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private SagaAbortedEvent sagaAbortedEvent;
+
+ private Builder() {
+ sagaAbortedEvent = new SagaAbortedEvent();
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ sagaAbortedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public SagaAbortedEvent build() {
+ return sagaAbortedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
new file mode 100644
index 0000000..b421c73
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
@@ -0,0 +1,14 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+public class SagaDomainEvent {
+ public interface DomainEvent {}
+
+ public enum SagaStartedEvent implements DomainEvent {INSTANCE}
+ public enum SagaEndedEvent implements DomainEvent {INSTANCE}
+ public enum SagaAbortedEvent implements DomainEvent {INSTANCE}
+ public enum SagaTimeoutEvent implements DomainEvent {INSTANCE}
+ public enum TxStartedEvent implements DomainEvent {INSTANCE}
+ public enum TxEndedEvent implements DomainEvent {INSTANCE}
+ public enum TxAbortedEvent implements DomainEvent {INSTANCE}
+ public enum TxComponsitedEvent implements DomainEvent {INSTANCE}
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java
new file mode 100644
index 0000000..7a87d77
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java
@@ -0,0 +1,28 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaEndedEvent extends SagaEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private SagaEndedEvent sagaEndedEvent;
+
+ private Builder() {
+ sagaEndedEvent = new SagaEndedEvent();
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ sagaEndedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public SagaEndedEvent build() {
+ return sagaEndedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java
new file mode 100644
index 0000000..4831be7
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java
@@ -0,0 +1,43 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaStartedEvent extends SagaEvent {
+ private int timeout; //second
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private SagaStartedEvent sagaStartedEvent;
+
+ private Builder() {
+ sagaStartedEvent = new SagaStartedEvent();
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ sagaStartedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public Builder timeout(int timeout) {
+ sagaStartedEvent.setTimeout(timeout);
+ return this;
+ }
+
+ public SagaStartedEvent build() {
+ return sagaStartedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java
new file mode 100644
index 0000000..4de5c37
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java
@@ -0,0 +1,27 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
+
+public class SagaTimeoutEvent extends SagaEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+ public static final class Builder {
+
+ private SagaTimeoutEvent sagaTimeoutEvent;
+
+ private Builder() {
+ sagaTimeoutEvent = new SagaTimeoutEvent();
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ sagaTimeoutEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public SagaTimeoutEvent build() {
+ return sagaTimeoutEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
new file mode 100644
index 0000000..620366f
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java
@@ -0,0 +1,39 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxAbortedEvent extends TxEvent {
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private TxAbortedEvent txAbortedEvent;
+
+ private Builder() {
+ txAbortedEvent = new TxAbortedEvent();
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txAbortedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txAbortedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txAbortedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public TxAbortedEvent build() {
+ return txAbortedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
new file mode 100644
index 0000000..cccb69a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxComponsitedEvent extends TxEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private TxComponsitedEvent txComponsitedEvent;
+
+ private Builder() {
+ txComponsitedEvent = new TxComponsitedEvent();
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txComponsitedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txComponsitedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txComponsitedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public TxComponsitedEvent build() {
+ return txComponsitedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java
new file mode 100644
index 0000000..33ea3d1
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java
@@ -0,0 +1,39 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxEndedEvent extends TxEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+
+ public static final class Builder {
+
+ private TxEndedEvent txEndedEvent;
+
+ private Builder() {
+ txEndedEvent = new TxEndedEvent();
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txEndedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txEndedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txEndedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public TxEndedEvent build() {
+ return txEndedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
new file mode 100644
index 0000000..2d10462
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.event;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxStartedEvent extends TxEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private TxStartedEvent txStartedEvent;
+
+ private Builder() {
+ txStartedEvent = new TxStartedEvent();
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txStartedEvent.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txStartedEvent.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ txStartedEvent.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public TxStartedEvent build() {
+ return txStartedEvent;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
new file mode 100644
index 0000000..ea8257d
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -0,0 +1,26 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+import java.io.Serializable;
+
+public abstract class BaseEvent implements Serializable {
+ private String globalTxId;
+
+ public BaseEvent() {
+
+ }
+
+ public String getGlobalTxId() {
+ return globalTxId;
+ }
+
+ public void setGlobalTxId(String globalTxId) {
+ this.globalTxId = globalTxId;
+ }
+
+ @Override
+ public String toString() {
+ return "BaseEvent{" +
+ "globalTxId='" + globalTxId + '\'' +
+ '}';
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java
new file mode 100644
index 0000000..bf39855
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java
@@ -0,0 +1,14 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+public class SagaEvent extends BaseEvent {
+
+ public SagaEvent() {
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "{" +
+ "globalTxId='" + this.getGlobalTxId() + '\'' +
+ '}';
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
new file mode 100644
index 0000000..56a06e0
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java
@@ -0,0 +1,31 @@
+package org.apache.servicecomb.pack.alpha.fsm.event.base;
+
+public abstract class TxEvent extends BaseEvent {
+ private String parentTxId;
+ private String localTxId;
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId = parentTxId;
+ }
+
+ public String getLocalTxId() {
+ return localTxId;
+ }
+
+ public void setLocalTxId(String localTxId) {
+ this.localTxId = localTxId;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "{" +
+ "globalTxId='" + this.getGlobalTxId() + '\'' +
+ "parentTxId='" + parentTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ '}';
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
new file mode 100644
index 0000000..0cc858a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -0,0 +1,131 @@
+package org.apache.servicecomb.pack.alpha.fsm.model;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SagaData implements Serializable {
+ private long beginTime = System.currentTimeMillis();
+ private long endTime;
+ private String globalTxId;
+ private long expirationTime;
+ private boolean terminated;
+ private AtomicLong compensationRunningCounter = new AtomicLong();
+ private Map<String,TxEntity> txEntityMap = new HashMap<>();
+
+ public long getBeginTime() {
+ return beginTime;
+ }
+
+ public void setBeginTime(long beginTime) {
+ this.beginTime = beginTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getGlobalTxId() {
+ return globalTxId;
+ }
+
+ public void setGlobalTxId(String globalTxId) {
+ this.globalTxId = globalTxId;
+ }
+
+ public long getExpirationTime() {
+ return expirationTime;
+ }
+
+ public void setExpirationTime(long expirationTime) {
+ this.expirationTime = expirationTime;
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
+
+ public AtomicLong getCompensationRunningCounter() {
+ return compensationRunningCounter;
+ }
+
+ public void setCompensationRunningCounter(
+ AtomicLong compensationRunningCounter) {
+ this.compensationRunningCounter = compensationRunningCounter;
+ }
+
+ public Map<String, TxEntity> getTxEntityMap() {
+ return txEntityMap;
+ }
+
+ public void setTxEntityMap(
+ Map<String, TxEntity> txEntityMap) {
+ this.txEntityMap = txEntityMap;
+ }
+
+ public long getTimeout(){
+ return expirationTime-System.currentTimeMillis();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private SagaData sagaData;
+
+ private Builder() {
+ sagaData = new SagaData();
+ }
+
+ public Builder beginTime(long beginTime) {
+ sagaData.setBeginTime(beginTime);
+ return this;
+ }
+
+ public Builder endTime(long endTime) {
+ sagaData.setEndTime(endTime);
+ return this;
+ }
+
+ public Builder globalTxId(String globalTxId) {
+ sagaData.setGlobalTxId(globalTxId);
+ return this;
+ }
+
+ public Builder expirationTime(long expirationTime) {
+ sagaData.setExpirationTime(expirationTime);
+ return this;
+ }
+
+ public Builder terminated(boolean terminated) {
+ sagaData.setTerminated(terminated);
+ return this;
+ }
+
+ public Builder compensationRunningCounter(AtomicLong compensationRunningCounter) {
+ sagaData.setCompensationRunningCounter(compensationRunningCounter);
+ return this;
+ }
+
+ public Builder txEntityMap(Map<String, TxEntity> txEntityMap) {
+ sagaData.setTxEntityMap(txEntityMap);
+ return this;
+ }
+
+ public SagaData build() {
+ return sagaData;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
new file mode 100644
index 0000000..f6a3222
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java
@@ -0,0 +1,95 @@
+package org.apache.servicecomb.pack.alpha.fsm.model;
+
+
+import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
+
+public class TxEntity implements Serializable {
+ private long beginTime = System.currentTimeMillis();
+ private long endTime;
+ private String parentTxId;
+ private String localTxId;
+ private TxState state;
+
+ public long getBeginTime() {
+ return beginTime;
+ }
+
+ public void setBeginTime(long beginTime) {
+ this.beginTime = beginTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId = parentTxId;
+ }
+
+ public String getLocalTxId() {
+ return localTxId;
+ }
+
+ public void setLocalTxId(String localTxId) {
+ this.localTxId = localTxId;
+ }
+
+ public TxState getState() {
+ return state;
+ }
+
+ public void setState(TxState state) {
+ this.state = state;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private TxEntity txEntity;
+
+ private Builder() {
+ txEntity = new TxEntity();
+ }
+
+ public Builder beginTime(long beginTime) {
+ txEntity.setBeginTime(beginTime);
+ return this;
+ }
+
+ public Builder endTime(long endTime) {
+ txEntity.setEndTime(endTime);
+ return this;
+ }
+
+ public Builder parentTxId(String parentTxId) {
+ txEntity.setParentTxId(parentTxId);
+ return this;
+ }
+
+ public Builder localTxId(String localTxId) {
+ txEntity.setLocalTxId(localTxId);
+ return this;
+ }
+
+ public Builder state(TxState state) {
+ txEntity.setState(state);
+ return this;
+ }
+
+ public TxEntity build() {
+ return txEntity;
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
new file mode 100644
index 0000000..156d67e
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -0,0 +1,619 @@
+package org.apache.servicecomb.pack.alpha.fsm;
+
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import akka.persistence.fsm.PersistentFSM;
+import akka.persistence.fsm.PersistentFSM.CurrentState;
+import akka.testkit.javadsl.TestKit;
+import java.time.Duration;
+import java.util.UUID;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SagaActorTest {
+
+ static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create("SagaActorTest");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ TestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ public String genPersistenceId() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 6. SagaEndedEvent-1
+ */
+ @Test
+ public void successfulTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxAbortedEvent-11
+ * 7. SagaAbortedEvent-1
+ */
+ @Test
+ public void firstTxAbortedEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 1);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxAbortedEvent-12
+ * 6. TxComponsitedEvent-11
+ * 7. SagaAbortedEvent-1
+ */
+ @Test
+ public void middleTxAbortedEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 2);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxAbortedEvent-13
+ * 8. TxComponsitedEvent-11
+ * 9. TxComponsitedEvent-12
+ * 10. SagaAbortedEvent-1
+ */
+ @Test
+ public void lastTxAbortedEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxAbortedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxEndedEvent-13
+ * 8. TxComponsitedEvent-12
+ * 9. TxComponsitedEvent-13
+ * 10. SagaAbortedEvent-1
+ */
+ @Test
+ public void receivedRemainingEventAfterfirstTxAbortedEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxEndedEvent-13
+ * 8. SagaAbortedEvent-1
+ * 9. TxComponsitedEvent-11
+ * 8. TxComponsitedEvent-12
+ * 9. TxComponsitedEvent-13
+ */
+ @Test
+ public void sagaAbortedEventAfterAllTxEndedTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 5. SagaTimeoutEvent-1
+ */
+ @Test
+ public void omegaSendSagaTimeoutEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(SagaTimeoutEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent(5s)-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 6. SagaEndedEvent-1
+ */
+ @Test
+ public void sagaActorTriggerTimeoutTest() throws InterruptedException {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ final int timeout = 5;
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ Thread.sleep(timeout*2000);
+ saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(Duration.ofSeconds(timeout+2),PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef,
+ SagaActorState from, SagaActorState to) {
+ assertEquals(transition.fsmRef(), actorRef);
+ assertEquals(transition.from(), from);
+ assertEquals(transition.to(), to);
+ }
+
+}
diff --git a/alpha/alpha-fsm/src/test/resources/application.conf b/alpha/alpha-fsm/src/test/resources/application.conf
new file mode 100644
index 0000000..1265036
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
+akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
+akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/test/resources/log4j2.xml b/alpha/alpha-fsm/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-fsm/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>