You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/11 07:09:37 UTC
[servicecomb-pack] 04/12: SCB-1321 Remove Actor replying methods
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit a93055e6bc413c4cb66d9ff30be0d5e71634b759
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 10 18:47:27 2019 +0800
SCB-1321 Remove Actor replying methods
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 118 +++++++++--------
.../pack/alpha/fsm/domain/SagaEndedDomain.java | 5 +
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 139 ++++++++++-----------
.../pack/alpha/fsm/SagaIntegrationTest.java | 24 ++--
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 30 ++---
5 files changed, 163 insertions(+), 153 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 6881148..a6e3d05 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -98,21 +98,21 @@ public class SagaActor extends
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data);
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(SagaAbortedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data);
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data);
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
})
);
@@ -120,7 +120,7 @@ public class SagaActor extends
matchEvent(TxEndedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
- if (data.getExpirationTime() != null) {
+ if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -132,7 +132,7 @@ public class SagaActor extends
).event(TxStartedEvent.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
- if (data.getExpirationTime() != null) {
+ if (data.getExpirationTime() != null) {
return stay()
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
@@ -145,7 +145,6 @@ public class SagaActor extends
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(TxAbortedEvent.class,
@@ -156,7 +155,8 @@ public class SagaActor extends
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
- return goTo(SagaActorState.SUSPENDED).replying(data);
+ return goTo(SagaActorState.SUSPENDED)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
})
);
@@ -189,7 +189,6 @@ public class SagaActor extends
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(SagaEndedEvent.class,
@@ -197,7 +196,6 @@ public class SagaActor extends
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
return goTo(SagaActorState.COMMITTED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(SagaAbortedEvent.class,
@@ -212,7 +210,8 @@ public class SagaActor extends
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
- return goTo(SagaActorState.SUSPENDED).replying(data);
+ return goTo(SagaActorState.SUSPENDED)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));//.replying(data);
})
);
@@ -222,7 +221,6 @@ public class SagaActor extends
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
).event(TxCompensatedEvent.class, SagaData.class,
@@ -235,12 +233,12 @@ public class SagaActor extends
).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
(event, data) -> {
if (hasCompensationSentTx(data) || !data.isTerminated()) {
- return stay().replying(data);
+ return stay();
} else {
- SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+ SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
}
@@ -249,15 +247,17 @@ public class SagaActor extends
data.setTerminated(true);
if (hasCommittedTx(data)) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
- return stay().replying(data).applying(domainEvent);
+ return stay()
+ .applying(domainEvent);
} else if (hasCompensationSentTx(data)) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
- return stay().replying(data).applying(domainEvent);
+ return stay()
+ .applying(domainEvent);
} else {
- SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(event,
+ SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED)
.applying(domainEvent)
- .replying(data)
.forMax(Duration.create(1, TimeUnit.MILLISECONDS));
}
}
@@ -277,21 +277,35 @@ public class SagaActor extends
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
- return goTo(SagaActorState.SUSPENDED).replying(data);
+ SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED);
+ return goTo(SagaActorState.SUSPENDED)
+ .applying(domainEvent)
+ .forMax(Duration.create(1, TimeUnit.MILLISECONDS));
})
);
when(SagaActorState.COMMITTED,
matchAnyEvent(
(event, data) -> {
- /**
- * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181
- * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
- * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的
- * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理
- * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr
- * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1
- * */
+ // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
+ // 以下基于 journal-redis 说明:
+ // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
+ // journal:persistenceIds
+ // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
+ // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
+ //
+ // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
+ // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
+ // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
+ // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
+ //
+ // 何如清理:
+ // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
+ // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
+ // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
+ // 并删除 journal:persisted:item:highestSequenceNr
+ //
+ // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
deleteMessages(lastSequenceNr());
deleteSnapshot(snapshotSequenceNr());
return stop();
@@ -333,7 +347,7 @@ public class SagaActor extends
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.putSagaData(stateData().getGlobalTxId(), stateData());
}
- if(LOG.isDebugEnabled()){
+ if (LOG.isDebugEnabled()) {
LOG.debug("transition {} {} -> {}", getSelf(), from, to);
}
})
@@ -342,14 +356,14 @@ public class SagaActor extends
onTermination(
matchStop(
Normal(), (state, data) -> {
- if(LOG.isDebugEnabled()){
- LOG.info("stop {} {}", data.getGlobalTxId(), state);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("stop {} {}", data.getGlobalTxId(), state);
}
data.setTerminated(true);
data.setLastState(state);
data.setEndTime(new Date());
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
- .putSagaData(data.getGlobalTxId(), data);
+ .stopSagaData(data.getGlobalTxId(), data);
}
)
);
@@ -359,7 +373,8 @@ public class SagaActor extends
@Override
public SagaData applyEvent(DomainEvent event, SagaData data) {
// log event to SagaData
- if(event.getEvent() != null && !(event.getEvent() instanceof TxComponsitedCheckInternalEvent)){
+ if (event.getEvent() != null && !(event
+ .getEvent() instanceof TxComponsitedCheckInternalEvent)) {
data.logEvent(event.getEvent());
}
if (event instanceof SagaStartedDomain) {
@@ -391,9 +406,6 @@ public class SagaActor extends
TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId());
txEntity.setEndTime(System.currentTimeMillis());
if (domainEvent.getState() == TxState.COMMITTED) {
- // stop
- //data.setEndTime(System.currentTimeMillis());
- //data.setTerminated(true);
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
txEntity.setState(domainEvent.getState());
@@ -431,13 +443,17 @@ public class SagaActor extends
data.setTerminated(true);
}
}
- LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId());
+ }
return data;
}
@Override
public void onRecoveryCompleted() {
- LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId());
+ }
}
@Override
@@ -468,37 +484,37 @@ public class SagaActor extends
// increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
txEntity.setState(TxState.COMPENSATION_SENT);
- try{
+ try {
SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
LOG.info("compensate {}", txEntity.getLocalTxId());
- }catch (AlphaException ex){
- LOG.error(ex.getMessage(),ex);
+ } catch (AlphaException ex) {
+ LOG.error(ex.getMessage(), ex);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
}
- compensation(txEntity,data);
- }catch (Exception ex){
- LOG.error("compensation failed "+txEntity.getLocalTxId(), ex);
- if(txEntity.getRetries() > 0){
+ compensation(txEntity, data);
+ } catch (Exception ex) {
+ LOG.error("compensation failed " + txEntity.getLocalTxId(), ex);
+ if (txEntity.getRetries() > 0) {
// which means the retry number
- if(txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()){
+ if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
}
- compensation(txEntity,data);
+ compensation(txEntity, data);
}
- } else if(txEntity.getRetries() == -1){
+ } else if (txEntity.getRetries() == -1) {
// which means retry it until succeed
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
}
- compensation(txEntity,data);
+ compensation(txEntity, data);
}
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
index 1f4b216..f5bc708 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java
@@ -32,6 +32,11 @@ public class SagaEndedDomain implements DomainEvent {
this.state = state;
}
+
+ public SagaEndedDomain(SagaActorState state) {
+ this.state = state;
+ }
+
public SagaActorState getState() {
return state;
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 926ae84..1b4d84b 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -132,19 +132,18 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- sagaData.getTxEntityMap().forEach((k, v) -> {
- assertEquals(v.getState(), TxState.COMMITTED);
- });
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
}};
@@ -220,13 +219,6 @@ public class SagaActorTest {
assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE,
SagaActorState.PARTIALLY_COMMITTED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- sagaData.getTxEntityMap().forEach((k, v) -> {
- assertEquals(v.getState(), TxState.COMMITTED);
- });
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED,
SagaActorState.COMMITTED);
@@ -234,6 +226,12 @@ public class SagaActorTest {
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), recoveredSaga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
eventListFirst.addAll(eventListSecond);
assertThat(eventListFirst, is(sagaData.getEvents()));
@@ -274,18 +272,17 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 1);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
- assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 1);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -337,19 +334,19 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 2);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
- assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 2);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -410,20 +407,19 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
- assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -484,15 +480,13 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
- sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntityMap().size(), 3);
assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
@@ -548,20 +542,19 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -700,19 +693,18 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -824,19 +816,18 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- sagaData.getTxEntityMap().forEach((k, v) -> {
- assertEquals(v.getState(), TxState.COMMITTED);
- });
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -888,19 +879,18 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- sagaData.getTxEntityMap().forEach((k, v) -> {
- assertEquals(v.getState(), TxState.COMMITTED);
- });
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
@@ -951,20 +941,19 @@ public class SagaActorTest {
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
- SagaData sagaData = expectMsgClass(SagaData.class);
- assertEquals(sagaData.getGlobalTxId(), globalTxId);
- assertEquals(sagaData.getTxEntityMap().size(), 3);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
- assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
-
transition = expectMsgClass(PersistentFSM.Transition.class);
assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
Terminated terminated = expectMsgClass(Terminated.class);
assertEquals(terminated.getActor(), saga);
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
system.stop(saga);
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index f408c09..c1a61a6 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -63,7 +63,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -84,7 +84,7 @@ public class SagaIntegrationTest {
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -103,7 +103,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -124,7 +124,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -146,7 +146,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -168,7 +168,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -190,7 +190,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -212,7 +212,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -235,7 +235,7 @@ public class SagaIntegrationTest {
});
await().atMost(timeout + 2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -257,7 +257,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -279,7 +279,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
@@ -301,7 +301,7 @@ public class SagaIntegrationTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertNotNull(sagaData.getBeginTime());
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index ecac3b8..d44d728 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -99,7 +99,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -121,7 +121,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -143,7 +143,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -167,7 +167,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -193,7 +193,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(5, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -225,7 +225,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(5, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -250,7 +250,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(20, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -275,7 +275,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -300,7 +300,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(timeout + 1, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -324,7 +324,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -348,7 +348,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -372,7 +372,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getTxEntityMap().size(),3);
@@ -396,7 +396,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -436,7 +436,7 @@ public class AlphaIntegrationFsmTest {
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
@@ -462,7 +462,7 @@ public class AlphaIntegrationFsmTest {
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
- return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);