You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/01 14:15:25 UTC
[servicecomb-pack] 02/02: SCB-1321 Support Akka Persistent Redis
Recovery
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 9cee529906beaba4b1c41ab0279093acc8b90b67
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jul 1 22:14:52 2019 +0800
SCB-1321 Support Akka Persistent Redis Recovery
---
alpha/alpha-fsm/README.md | 1 +
alpha/alpha-fsm/pom.xml | 19 +-
.../servicecomb/pack/alpha/fsm/SagaActor.java | 284 ++++++++++++--------
.../apache/servicecomb/pack/alpha/fsm/TxState.java | 3 +-
.../AddTxEventDomain.java} | 40 ++-
.../fsm/{TxState.java => domain/DomainEvent.java} | 11 +-
.../{TxState.java => domain/SagaEndedDomain.java} | 20 +-
.../SagaStartedDomain.java} | 28 +-
.../UpdateTxEventDomain.java} | 41 ++-
...t.java => TxComponsitedCheckInternalEvent.java} | 31 ++-
.../pack/alpha/fsm/event/base/BaseEvent.java | 5 +
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 288 +++++++++++++++++++--
.../pack/alpha/fsm/SagaEventSender.java | 60 ++++-
.../pack/alpha/fsm/SagaIntegrationTest.java | 52 +++-
.../alpha-fsm/src/test/resources/application.yaml | 20 +-
docs/fsm/assets/saga_state_diagram.png | Bin 237463 -> 244645 bytes
docs/fsm/plantuml/saga-state-diagram.puml | 2 +-
17 files changed, 705 insertions(+), 200 deletions(-)
diff --git a/alpha/alpha-fsm/README.md b/alpha/alpha-fsm/README.md
index 3a37fed..e7ea7b9 100644
--- a/alpha/alpha-fsm/README.md
+++ b/alpha/alpha-fsm/README.md
@@ -6,6 +6,7 @@
## Test State Machine
```
+git clone -b SCB-1321 git@github.com:apache/servicecomb-pack.git
cd alpha
mvn clean package -pl alpha-fsm
```
\ No newline at end of file
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index b1897d9..8788430 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -31,6 +31,7 @@
<properties>
<leveldbjni-all.version>1.8</leveldbjni-all.version>
+ <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version>
</properties>
<dependencyManagement>
@@ -72,12 +73,7 @@
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>
- </dependency>
-<!-- <dependency>-->
-<!-- <groupId>org.apache.logging.log4j</groupId>-->
-<!-- <artifactId>log4j-slf4j-impl</artifactId>-->
-<!-- <scope>test</scope>-->
-<!-- </dependency>-->
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
@@ -103,6 +99,11 @@
<artifactId>leveldbjni-all</artifactId>
<version>${leveldbjni-all.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.safety-data</groupId>
+ <artifactId>akka-persistence-redis_2.12</artifactId>
+ <version>${akka-persistence-redis.version}</version>
+ </dependency>
<!-- For testing the artifacts scope are test-->
<dependency>
@@ -138,7 +139,11 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.12</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index dca006e..d22cb79 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -22,19 +22,20 @@ import akka.persistence.fsm.AbstractPersistentFSM;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain;
+import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
@@ -62,14 +63,15 @@ public class SagaActor extends
when(SagaActorState.IDEL,
matchEvent(SagaStartedEvent.class,
(event, data) -> {
- data.setGlobalTxId(event.getGlobalTxId());
- data.setBeginTime(System.currentTimeMillis());
+ SagaStartedDomain domainEvent = new SagaStartedDomain(event.getGlobalTxId(),
+ event.getCreateTime(), event.getTimeout());
if (event.getTimeout() > 0) {
- data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000);
return goTo(SagaActorState.READY)
- .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ .applying(domainEvent)
+ .forMax(Duration.create(event.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return goTo(SagaActorState.READY);
+ return goTo(SagaActorState.READY)
+ .applying(domainEvent);
}
}
@@ -79,25 +81,36 @@ public class SagaActor extends
when(SagaActorState.READY,
matchEvent(TxStartedEvent.class, SagaData.class,
(event, data) -> {
- updateTxEntity(event, data);
+ AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId());
if (data.getExpirationTime() > 0) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return goTo(SagaActorState.PARTIALLY_ACTIVE);
+ return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .applying(domainEvent);
}
}
).event(SagaEndedEvent.class,
(event, data) -> {
- return goTo(SagaActorState.SUSPENDED).replying(data);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+ return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
+ .replying(data);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
- return goTo(SagaActorState.SUSPENDED).replying(data);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+ return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
+ .replying(data);
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
.replying(data);
})
);
@@ -105,34 +118,43 @@ public class SagaActor extends
when(SagaActorState.PARTIALLY_ACTIVE,
matchEvent(TxEndedEvent.class, SagaData.class,
(event, data) -> {
- updateTxEntity(event, data);
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.COMMITTED);
if (data.getExpirationTime() > 0) {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
+ .applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return goTo(SagaActorState.PARTIALLY_COMMITTED);
+ return goTo(SagaActorState.PARTIALLY_COMMITTED)
+ .applying(domainEvent);
}
}
).event(TxStartedEvent.class,
(event, data) -> {
- updateTxEntity(event, data);
+ AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId());
if (data.getExpirationTime() > 0) {
return stay()
+ .applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return stay();
+ return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(TxAbortedEvent.class,
(event, data) -> {
- updateTxEntity(event, data);
- return goTo(SagaActorState.FAILED);
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.FAILED);
+ return goTo(SagaActorState.FAILED)
+ .applying(domainEvent);
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
@@ -143,47 +165,55 @@ public class SagaActor extends
when(SagaActorState.PARTIALLY_COMMITTED,
matchEvent(TxStartedEvent.class,
(event, data) -> {
- updateTxEntity(event, data);
+ AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId());
if (data.getExpirationTime() > 0) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return goTo(SagaActorState.PARTIALLY_ACTIVE);
+ return goTo(SagaActorState.PARTIALLY_ACTIVE)
+ .applying(domainEvent);
}
}
).event(TxEndedEvent.class,
(event, data) -> {
- updateTxEntity(event, data);
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.COMMITTED);
if (data.getExpirationTime() > 0) {
return stay()
+ .applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
- return stay();
+ return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(SagaEndedEvent.class,
(event, data) -> {
- data.setEndTime(System.currentTimeMillis());
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.COMMITTED);
return goTo(SagaActorState.COMMITTED)
+ .applying(domainEvent)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(SagaAbortedEvent.class,
(event, data) -> {
- data.setEndTime(System.currentTimeMillis());
- updateTxEntity(event, data);
- return goTo(SagaActorState.FAILED);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+ return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
- updateTxEntity(event, data);
- return goTo(SagaActorState.FAILED);
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.FAILED);
+ return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
@@ -194,18 +224,25 @@ public class SagaActor extends
when(SagaActorState.FAILED,
matchEvent(SagaTimeoutEvent.class, SagaData.class,
(event, data) -> {
- data.setEndTime(System.currentTimeMillis());
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(TxComponsitedEvent.class, SagaData.class,
(event, data) -> {
- data.setEndTime(System.currentTimeMillis());
- updateTxEntity(event, data);
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.COMPENSATED);
+ return stay().applying(domainEvent).andThen(exec(_data -> {
+ self().tell(TxComponsitedCheckInternalEvent.builder().build(), self());
+ }));
+ }
+ ).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
+ (event, data) -> {
if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
|| hasCommittedTx(data)) {
- return stay();
+ return stay().replying(data);
} else {
return goTo(SagaActorState.COMPENSATED)
.replying(data)
@@ -214,30 +251,36 @@ public class SagaActor extends
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
- data.setEndTime(System.currentTimeMillis());
- updateTxEntity(event, data);
data.setTerminated(true);
- if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0)
- || hasCommittedTx(data)) {
- return stay();
+ if (hasCommittedTx(data)) {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
+ return stay().replying(data).applying(domainEvent);
+ } else if(hasCompensationSentTx(data)){
+ return stay().replying(data);
} else {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(
+ SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED)
+ .applying(domainEvent)
.replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
}
).event(TxStartedEvent.class, SagaData.class,
(event, data) -> {
- updateTxEntity(event, data);
- return stay();
+ AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId());
+ return stay().applying(domainEvent);
}
).event(TxEndedEvent.class, SagaData.class,
(event, data) -> {
- updateTxEntity(event, data);
- TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
- // TODO call compensate
- compensation(txEntity, data);
- return stay();
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(),
+ event.getLocalTxId(), TxState.COMMITTED);
+ return stay().applying(domainEvent).andThen(exec(_data -> {
+ TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId());
+ // call compensate
+ compensation(txEntity, _data);
+ }));
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
@@ -248,6 +291,17 @@ public class SagaActor extends
when(SagaActorState.COMMITTED,
matchAnyEvent(
(event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ /**
+ * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
+ * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+ * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
+ * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
+ * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
+ * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
+ * */
+ deleteMessages(lastSequenceNr());
+ deleteSnapshot(snapshotSequenceNr());
return stop();
}
)
@@ -256,6 +310,9 @@ public class SagaActor extends
when(SagaActorState.SUSPENDED,
matchAnyEvent(
(event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ deleteMessages(lastSequenceNr());
+ deleteSnapshot(snapshotSequenceNr());
return stop();
}
)
@@ -264,6 +321,9 @@ public class SagaActor extends
when(SagaActorState.COMPENSATED,
matchAnyEvent(
(event, data) -> {
+ data.setEndTime(System.currentTimeMillis());
+ deleteMessages(lastSequenceNr());
+ deleteSnapshot(snapshotSequenceNr());
return stop();
}
)
@@ -278,6 +338,11 @@ public class SagaActor extends
onTransition(
matchState(null, null, (from, to) -> {
+ if (stateData().getGlobalTxId() != null) {
+ stateData().setLastState(to);
+ LogExtension.LogExtensionProvider.get(getContext().getSystem())
+ .putSagaData(stateData().getGlobalTxId(), stateData());
+ }
LOG.info("transition {} {} -> {}", getSelf(), from, to);
})
);
@@ -288,7 +353,8 @@ public class SagaActor extends
LOG.info("stop {} {}", data.getGlobalTxId(), state);
data.setTerminated(true);
data.setLastState(state);
- LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data);
+ LogExtension.LogExtensionProvider.get(getContext().getSystem())
+ .putSagaData(data.getGlobalTxId(), data);
}
)
);
@@ -296,73 +362,77 @@ public class SagaActor extends
}
@Override
- public void onRecoveryCompleted() {
- LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData());
- }
-
- @Override
- public Class domainEventClass() {
- return SagaDomainEvent.DomainEvent.class;
- }
-
-
- @Override
- public String persistenceId() {
- return persistenceId;
- }
-
- @Override
- public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) {
- return currentData;
- }
-
- private void updateTxEntity(BaseEvent event, SagaData data) {
- if (event instanceof TxEvent) {
- TxEvent txEvent = (TxEvent) event;
- if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) {
- if (event instanceof TxStartedEvent) {
- TxEntity txEntity = TxEntity.builder()
- .localTxId(txEvent.getLocalTxId())
- .parentTxId(txEvent.getParentTxId())
- .state(TxState.ACTIVE)
- .build();
- data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
- }
+ public SagaData applyEvent(DomainEvent event, SagaData data) {
+ if (event instanceof SagaStartedDomain) {
+ SagaStartedDomain domainEvent = (SagaStartedDomain) event;
+ data.setGlobalTxId(domainEvent.getGlobalTxId());
+ data.setBeginTime(domainEvent.getCreateTime());
+ data.setExpirationTime(domainEvent.getExpirationTime());
+ } else if (event instanceof AddTxEventDomain) {
+ AddTxEventDomain domainEvent = (AddTxEventDomain) event;
+ if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) {
+ TxEntity txEntity = TxEntity.builder()
+ .localTxId(domainEvent.getLocalTxId())
+ .parentTxId(domainEvent.getParentTxId())
+ .state(domainEvent.getState())
+ .build();
+ data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity);
} else {
- TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId());
- if (event instanceof TxEndedEvent) {
- if (txEntity.getState() == TxState.ACTIVE) {
- txEntity.setEndTime(System.currentTimeMillis());
- txEntity.setState(TxState.COMMITTED);
- }
- } else if (event instanceof TxAbortedEvent) {
- if (txEntity.getState() == TxState.ACTIVE) {
- txEntity.setEndTime(System.currentTimeMillis());
- txEntity.setState(TxState.FAILED);
- data.getTxEntityMap().forEach((k, v) -> {
- if (v.getState() == TxState.COMMITTED) {
- // call compensate
- compensation(v, data);
- }
- });
+ LOG.warn("TxEntity {} already exists", domainEvent.getLocalTxId());
+ }
+ } else if (event instanceof UpdateTxEventDomain) {
+ UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
+ TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
+ txEntity.setEndTime(System.currentTimeMillis());
+ if (domainEvent.getState() == TxState.COMMITTED) {
+ txEntity.setState(domainEvent.getState());
+ } else if (domainEvent.getState() == TxState.FAILED) {
+ txEntity.setState(domainEvent.getState());
+ data.getTxEntityMap().forEach((k, v) -> {
+ if (v.getState() == TxState.COMMITTED) {
+ // call compensate
+ compensation(v, data);
}
- } else if (event instanceof TxComponsitedEvent) {
- // decrement the compensation running counter by one
- data.getCompensationRunningCounter().decrementAndGet();
- txEntity.setState(TxState.COMPENSATED);
- LOG.info("compensation is completed {}", txEntity.getLocalTxId());
- }
+ });
+ } else if (domainEvent.getState() == TxState.COMPENSATED) {
+ // decrement the compensation running counter by one
+ data.getCompensationRunningCounter().decrementAndGet();
+ txEntity.setState(domainEvent.getState());
+ LOG.info("compensation is completed {}", txEntity.getLocalTxId());
}
- } else if (event instanceof SagaEvent) {
- if (event instanceof SagaAbortedEvent) {
+ } else if (event instanceof SagaEndedDomain) {
+ SagaEndedDomain domainEvent = (SagaEndedDomain) event;
+ if (domainEvent.getState() == SagaActorState.FAILED) {
data.getTxEntityMap().forEach((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
compensation(v, data);
}
});
+ } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
+
+ } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
+
}
}
+ LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+ return data;
+ }
+
+ @Override
+ public void onRecoveryCompleted() {
+ LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+ }
+
+ @Override
+ public Class domainEventClass() {
+ return DomainEvent.class;
+ }
+
+
+ @Override
+ public String persistenceId() {
+ return persistenceId;
}
private boolean hasCommittedTx(SagaData data) {
@@ -371,9 +441,17 @@ public class SagaActor extends
.count() > 0;
}
+ private boolean hasCompensationSentTx(SagaData data) {
+ return data.getTxEntityMap().entrySet().stream()
+ .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT)
+ .count() > 0;
+ }
+
private void compensation(TxEntity txEntity, SagaData data) {
// increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
+ //TODO call omega compensate method
LOG.info("compensate {}", txEntity.getLocalTxId());
+ txEntity.setState(TxState.COMPENSATION_SENT);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
index 0494895..af02ce8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
@@ -21,5 +21,6 @@ public enum TxState {
ACTIVE,
FAILED,
COMMITTED,
- COMPENSATED;
+ COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent
+ COMPENSATED
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
similarity index 51%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
index 5794566..c7c65a3 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java
@@ -15,29 +15,41 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
-public abstract class BaseEvent implements Serializable {
- private String globalTxId;
+public class AddTxEventDomain implements DomainEvent {
+ private String parentTxId;
+ private String localTxId;
+ private TxState state = TxState.ACTIVE;
- public BaseEvent() {
+ public AddTxEventDomain(String parentTxId, String localTxId) {
+ this.parentTxId = parentTxId;
+ this.localTxId = localTxId;
+ }
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId = parentTxId;
+ }
+ public String getLocalTxId() {
+ return localTxId;
}
- public String getGlobalTxId() {
- return globalTxId;
+ public void setLocalTxId(String localTxId) {
+ this.localTxId = localTxId;
}
- public void setGlobalTxId(String globalTxId) {
- this.globalTxId = globalTxId;
+ public TxState getState() {
+ return state;
}
- @Override
- public String toString() {
- return "BaseEvent{" +
- "globalTxId='" + globalTxId + '\'' +
- '}';
+ public void setState(TxState state) {
+ this.state = state;
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
similarity index 85%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
index 0494895..10c293c 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
+
+import java.io.Serializable;
+
+public interface DomainEvent extends Serializable {
-public enum TxState {
- ACTIVE,
- FAILED,
- COMMITTED,
- COMPENSATED;
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
similarity index 70%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 0494895..d3e40b1 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -15,11 +15,19 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
-public enum TxState {
- ACTIVE,
- FAILED,
- COMMITTED,
- COMPENSATED;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
+
+public class SagaEndedDomain implements DomainEvent {
+
+ private SagaActorState state;
+
+ public SagaEndedDomain(SagaActorState state) {
+ this.state = state;
+ }
+
+ public SagaActorState getState() {
+ return state;
+ }
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
similarity index 65%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
index 5794566..fe75f04 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java
@@ -15,29 +15,31 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
-import java.io.Serializable;
+public class SagaStartedDomain implements DomainEvent {
-public abstract class BaseEvent implements Serializable {
+ private long createTime;
private String globalTxId;
+ private long expirationTime;
- public BaseEvent() {
+ public SagaStartedDomain(String globalTxId, long createTime, int timeout) {
+ this.createTime = createTime;
+ this.globalTxId = globalTxId;
+ if (timeout > 0) {
+ this.expirationTime = System.currentTimeMillis() + timeout * 1000;
+ }
+ }
+ public long getCreateTime() {
+ return createTime;
}
public String getGlobalTxId() {
return globalTxId;
}
- public void setGlobalTxId(String globalTxId) {
- this.globalTxId = globalTxId;
- }
-
- @Override
- public String toString() {
- return "BaseEvent{" +
- "globalTxId='" + globalTxId + '\'' +
- '}';
+ public long getExpirationTime() {
+ return expirationTime;
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
similarity index 50%
copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index 5794566..839cfe0 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -15,29 +15,42 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.alpha.fsm.event.base;
+package org.apache.servicecomb.pack.alpha.fsm.domain;
-import java.io.Serializable;
+import org.apache.servicecomb.pack.alpha.fsm.TxState;
-public abstract class BaseEvent implements Serializable {
- private String globalTxId;
+public class UpdateTxEventDomain implements DomainEvent {
+ private String parentTxId;
+ private String localTxId;
+ private TxState state;
- public BaseEvent() {
+ public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state) {
+ this.parentTxId = parentTxId;
+ this.localTxId = localTxId;
+ this.state = state;
+ }
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId = parentTxId;
+ }
+ public String getLocalTxId() {
+ return localTxId;
}
- public String getGlobalTxId() {
- return globalTxId;
+ public void setLocalTxId(String localTxId) {
+ this.localTxId = localTxId;
}
- public void setGlobalTxId(String globalTxId) {
- this.globalTxId = globalTxId;
+ public TxState getState() {
+ return state;
}
- @Override
- public String toString() {
- return "BaseEvent{" +
- "globalTxId='" + globalTxId + '\'' +
- '}';
+ public void setState(TxState state) {
+ this.state = state;
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
similarity index 60%
rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
index b16e44a..225ef26 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java
@@ -17,15 +17,24 @@
package org.apache.servicecomb.pack.alpha.fsm.event;
-public class SagaDomainEvent {
- public interface DomainEvent {}
-
- public enum SagaStartedEvent implements DomainEvent {INSTANCE}
- public enum SagaEndedEvent implements DomainEvent {INSTANCE}
- public enum SagaAbortedEvent implements DomainEvent {INSTANCE}
- public enum SagaTimeoutEvent implements DomainEvent {INSTANCE}
- public enum TxStartedEvent implements DomainEvent {INSTANCE}
- public enum TxEndedEvent implements DomainEvent {INSTANCE}
- public enum TxAbortedEvent implements DomainEvent {INSTANCE}
- public enum TxComponsitedEvent implements DomainEvent {INSTANCE}
+import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent;
+
+public class TxComponsitedCheckInternalEvent extends TxEvent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private TxComponsitedCheckInternalEvent txComponsitedEvent;
+
+ private Builder() {
+ txComponsitedEvent = new TxComponsitedCheckInternalEvent();
+ }
+
+ public TxComponsitedCheckInternalEvent build() {
+ return txComponsitedEvent;
+ }
+ }
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
index 5794566..4ba8372 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java
@@ -21,11 +21,16 @@ import java.io.Serializable;
public abstract class BaseEvent implements Serializable {
private String globalTxId;
+ private long createTime = System.currentTimeMillis();
public BaseEvent() {
}
+ public long getCreateTime() {
+ return createTime;
+ }
+
public String getGlobalTxId() {
return globalTxId;
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 5644c1b..313c526 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -25,9 +25,13 @@ import akka.actor.Terminated;
import akka.persistence.fsm.PersistentFSM;
import akka.persistence.fsm.PersistentFSM.CurrentState;
import akka.testkit.javadsl.TestKit;
+import com.typesafe.config.ConfigFactory;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
+import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,9 +40,30 @@ public class SagaActorTest {
static ActorSystem system;
+ private static Map<String,Object> getPersistenceMemConfig(){
+ Map<String, Object> map = new HashMap<>();
+ map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem");
+ map.put("akka.persistence.journal.leveldb.dir", "target/example/journal");
+ map.put("akka.persistence.snapshot-store.plugin", "akka.persistence.snapshot-store.local");
+ map.put("akka.persistence.snapshot-store.local.dir", "target/example/snapshots");
+ return map;
+ }
+
+ private static Map<String,Object> getPersistenceRedisConfig(){
+ Map<String, Object> map = new HashMap<>();
+ map.put("akka.actor.warn-about-java-serializer-usage",false);
+ map.put("akka.persistence.journal.plugin", "akka-persistence-redis.journal");
+ map.put("akka.persistence.snapshot-store.plugin", "akka-persistence-redis.snapshot");
+ map.put("akka-persistence-redis.redis.mode", "simple");
+ map.put("akka-persistence-redis.redis.host", "localhost");
+ map.put("akka-persistence-redis.redis.port", "6379");
+ map.put("akka-persistence-redis.redis.database", "0");
+ return map;
+ }
+
@BeforeClass
public static void setup() {
- system = ActorSystem.create("SagaActorTest");
+ system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
}
@AfterClass
@@ -69,7 +94,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -122,6 +147,158 @@ public class SagaActorTest {
/**
* 1. SagaStartedEvent-1
* 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-13
+ * 6. SagaEndedEvent-1
+ */
+ @Test
+ public void successfulRecoveryWithCorrectStateDataTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ String persistenceId = genPersistenceId();
+ ActorRef saga = system.actorOf(SagaActor.props(persistenceId));
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ //expectTerminated(saga);
+
+ ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+ watch(recoveredSaga);
+ recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ recoveredSaga.tell(event, getRef());
+ });
+
+ currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), recoveredSaga);
+ system.stop(saga);
+ }};
+ }
+
+ @Test
+ public void successfulRecoveryWithCorrectStateDataTestAndDiffentSystem() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ String persistenceId = genPersistenceId();
+ ActorSystem system1 = ActorSystem.create("SagaActorTest1", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+ new TestKit(system1) {{
+ ActorRef saga = system1.actorOf(SagaActor.props(persistenceId));
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+ system1.stop(saga);
+ }};
+ TestKit.shutdownActorSystem(system1);
+
+ ActorSystem system2 = ActorSystem.create("SagaActorTest2", ConfigFactory.parseMap(getPersistenceRedisConfig()));
+ new TestKit(system2) {{
+ ActorRef recoveredSaga = system2.actorOf(SagaActor.props(persistenceId), "recoveredSaga");
+ watch(recoveredSaga);
+ recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ recoveredSaga.tell(event, getRef());
+ });
+
+ CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), recoveredSaga);
+ system2.stop(recoveredSaga);
+ }};
+ TestKit.shutdownActorSystem(system2);
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
* 3. TxAbortedEvent-11
* 7. SagaAbortedEvent-1
*/
@@ -131,7 +308,7 @@ public class SagaActorTest {
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -185,7 +362,7 @@ public class SagaActorTest {
final String localTxId_1 = UUID.randomUUID().toString();
final String localTxId_2 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -249,7 +426,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -303,6 +480,79 @@ public class SagaActorTest {
/**
* 1. SagaStartedEvent-1
* 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxAbortedEvent-13
+ * 8. TxComponsitedEvent-11
+ * 9. TxComponsitedEvent-12
+ * 10. SagaAbortedEvent-1
+ */
+ @Test
+ public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
* 3. TxAbortedEvent-11
* 4. TxStartedEvent-12
* 5. TxEndedEvent-12
@@ -320,7 +570,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -381,7 +631,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -417,7 +667,13 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -425,12 +681,6 @@ public class SagaActorTest {
assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
- transition = expectMsgClass(PersistentFSM.Transition.class);
- assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
-
- Terminated terminated = expectMsgClass(Terminated.class);
- assertEquals(terminated.getActor(), saga);
-
system.stop(saga);
}};
}
@@ -454,7 +704,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -522,7 +772,7 @@ public class SagaActorTest {
final String localTxId_3 = UUID.randomUUID().toString();
final int timeout = 5;
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
@@ -584,7 +834,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -639,7 +889,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
@@ -702,7 +952,7 @@ public class SagaActorTest {
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
- ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
watch(saga);
saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index a3ddd0d..d303f86 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -124,6 +124,33 @@ public class SagaEventSender {
/**
* 1. SagaStartedEvent-1
* 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxEndedEvent-12
+ * 6. TxStartedEvent-13
+ * 7. TxAbortedEvent-13
+ * 8. SagaAbortedEvent-1
+ * 9. TxComponsitedEvent-11
+ * 10. TxComponsitedEvent-12
+ */
+ public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
* 3. TxAbortedEvent-11
* 4. TxStartedEvent-12
* 5. TxEndedEvent-12
@@ -292,6 +319,37 @@ public class SagaEventSender {
sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build());
return sagaEvents;
- }
+ }
+
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ */
+ public static List<BaseEvent> successfulFirstHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. TxEndedEvent-12
+ * 2. TxStartedEvent-13
+ * 3. TxEndedEvent-13
+ * 4. SagaEndedEvent-1
+ */
+ public static List<BaseEvent> successfulSecondHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 5a213f1..85c8b18 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -36,9 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = {SagaApplication.class},
properties = {
"alpha.model.actor.enabled=true",
- "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
- "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
- "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots"
+ "spring.profiles.active=akka-persistence-redis"
})
public class SagaIntegrationTest {
@@ -63,6 +61,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -85,6 +85,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 1
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED;
}else{
@@ -106,6 +108,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 2
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED;
@@ -129,6 +133,34 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
+ && sagaData.getTxEntityMap().size() == 3
+ && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
+ && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED;
+ }else{
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void sagaAbortedEventBeforeTxComponsitedEventTest() {
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ sagaEventBus.post(event);
+ });
+
+ await().atMost(1, SECONDS).until(() -> {
+ SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
+ if(sagaData != null){
+ return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -153,6 +185,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -177,6 +211,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
@@ -201,6 +237,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.SUSPENDED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -226,6 +264,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.SUSPENDED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -250,6 +290,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -274,6 +316,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMMITTED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED
@@ -298,6 +342,8 @@ public class SagaIntegrationTest {
SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);
if(sagaData != null){
return sagaData.getLastState() == SagaActorState.COMPENSATED
+ && sagaData.getBeginTime() > 0
+ && sagaData.getEndTime() >0
&& sagaData.getTxEntityMap().size() == 3
&& sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED
&& sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED
diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml
index b3577a6..9ca5b2e 100644
--- a/alpha/alpha-fsm/src/test/resources/application.yaml
+++ b/alpha/alpha-fsm/src/test/resources/application.yaml
@@ -15,7 +15,25 @@
## limitations under the License.
## ---------------------------------------------------------------------------
+---
+spring:
+ profiles: akka-persistence-mem
akkaConfig:
akka.persistence.journal.plugin: akka.persistence.journal.inmem
+ akka.persistence.journal.leveldb.dir: target/example/journal
akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local
- akka.persistence.snapshot-store.local.dir: target/example/snapshots
\ No newline at end of file
+ akka.persistence.snapshot-store.local.dir: target/example/snapshots
+
+---
+spring:
+ profiles: akka-persistence-redis
+akkaConfig:
+ akka.persistence.journal.plugin: akka-persistence-redis.journal
+ akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot
+ akka-persistence-redis:
+ redis:
+ mode: simple
+ host: localhost
+ port: 6379
+ database: 0
+ #password:
\ No newline at end of file
diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png
index 349fb72..4bc1ba4 100644
Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ
diff --git a/docs/fsm/plantuml/saga-state-diagram.puml b/docs/fsm/plantuml/saga-state-diagram.puml
index 429687d..b9233c5 100644
--- a/docs/fsm/plantuml/saga-state-diagram.puml
+++ b/docs/fsm/plantuml/saga-state-diagram.puml
@@ -33,7 +33,7 @@ FAILED --> COMPENSATED : SagaAbortedEvent<font color=red>:doCompensation</font>
FAILED --> SUSPENDED : SagaTimeoutEvent
-FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>
+FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>\nTxComponsitedCheckInternalEvent
COMPENSATED --> [*]