You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/01/14 11:33:53 UTC
[servicecomb-pack] 04/08: SCB-1696 Optimize state machine
compensation retry strategy using reverseRetries and
retryDelayInMilliseconds
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 5baeda1e082c16c19947927c1a26d7f57b40403f
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jan 13 02:12:59 2020 +0800
SCB-1696 Optimize state machine compensation retry strategy using reverseRetries and retryDelayInMilliseconds
---
.../pack/alpha/core/fsm/SuspendedType.java | 2 +-
.../servicecomb/pack/alpha/core/fsm/TxState.java | 4 +-
...kEvent.java => TxCompensateAckFailedEvent.java} | 22 ++---
...Event.java => TxCompensateAckSucceedEvent.java} | 23 +----
.../servicecomb/pack/alpha/fsm/SagaActor.java | 50 ++++++----
.../pack/alpha/fsm/domain/UpdateTxEventDomain.java | 13 ++-
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 101 ++++++++++++++++-----
.../pack/alpha/fsm/SagaEventSender.java | 77 +++++++++++-----
.../pack/alpha/fsm/SagaIntegrationTest.java | 24 ++---
.../alpha/server/fsm/GrpcSagaEventService.java | 11 ++-
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 36 ++++----
.../grpc/saga/GrpcSagaClientMessageSender.java | 2 +
.../pack/omega/transaction/CallbackContext.java | 10 +-
.../transaction/CompensationMessageHandler.java | 6 +-
.../transaction/TxCompensateAckFailedEvent.java | 4 +-
.../transaction/TxCompensateAckSucceedEvent.java | 4 +-
.../transaction/tcc/CoordinateMessageHandler.java | 2 +-
.../CompensationMessageHandlerTest.java | 66 ++++++++++++--
18 files changed, 304 insertions(+), 153 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
index a3189d2..f2ed57a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/SuspendedType.java
@@ -18,5 +18,5 @@
package org.apache.servicecomb.pack.alpha.core.fsm;
public enum SuspendedType {
- NONE, TIMEOUT, UNPREDICTABLE
+ NONE, TIMEOUT, COMPENSATE_FAILED, UNPREDICTABLE
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
index 747cfc1..f7848fb 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/TxState.java
@@ -22,5 +22,7 @@ public enum TxState {
FAILED,
COMMITTED,
COMPENSATION_SENT, // The compensation method has been called to wait for TxCompensatedEvent
- COMPENSATED
+ COMPENSATED, // Just for compatibility with historical data deserialization in ES
+ COMPENSATED_SUCCEED,
+ COMPENSATED_FAILED,
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
similarity index 80%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
index 1fae858..a033d21 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckFailedEvent.java
@@ -20,16 +20,16 @@ package org.apache.servicecomb.pack.alpha.core.fsm.event;
import java.util.Date;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
-public class TxCompensateAckEvent extends TxEvent {
+public class TxCompensateAckFailedEvent extends TxEvent {
- private boolean succeed;
+ private byte[] payloads;
- public boolean isSucceed() {
- return succeed;
+ public byte[] getPayloads() {
+ return payloads;
}
- public void setSucceed(boolean succeed) {
- this.succeed = succeed;
+ public void setPayloads(byte[] payloads) {
+ this.payloads = payloads;
}
public static Builder builder() {
@@ -38,10 +38,10 @@ public class TxCompensateAckEvent extends TxEvent {
public static final class Builder {
- private TxCompensateAckEvent txCompensatedEvent;
+ private TxCompensateAckFailedEvent txCompensatedEvent;
private Builder() {
- txCompensatedEvent = new TxCompensateAckEvent();
+ txCompensatedEvent = new TxCompensateAckFailedEvent();
}
public Builder serviceName(String serviceName) {
@@ -74,12 +74,12 @@ public class TxCompensateAckEvent extends TxEvent {
return this;
}
- public Builder succeed(boolean succeed){
- txCompensatedEvent.setSucceed(succeed);
+ public Builder payloads(byte[] payloads){
+ txCompensatedEvent.setPayloads(payloads);
return this;
}
- public TxCompensateAckEvent build() {
+ public TxCompensateAckFailedEvent build() {
return txCompensatedEvent;
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
similarity index 79%
rename from alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
index 1fae858..d1b7c75 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxCompensateAckSucceedEvent.java
@@ -20,17 +20,7 @@ package org.apache.servicecomb.pack.alpha.core.fsm.event;
import java.util.Date;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.TxEvent;
-public class TxCompensateAckEvent extends TxEvent {
-
- private boolean succeed;
-
- public boolean isSucceed() {
- return succeed;
- }
-
- public void setSucceed(boolean succeed) {
- this.succeed = succeed;
- }
+public class TxCompensateAckSucceedEvent extends TxEvent {
public static Builder builder() {
return new Builder();
@@ -38,10 +28,10 @@ public class TxCompensateAckEvent extends TxEvent {
public static final class Builder {
- private TxCompensateAckEvent txCompensatedEvent;
+ private TxCompensateAckSucceedEvent txCompensatedEvent;
private Builder() {
- txCompensatedEvent = new TxCompensateAckEvent();
+ txCompensatedEvent = new TxCompensateAckSucceedEvent();
}
public Builder serviceName(String serviceName) {
@@ -74,12 +64,7 @@ public class TxCompensateAckEvent extends TxEvent {
return this;
}
- public Builder succeed(boolean succeed){
- txCompensatedEvent.setSucceed(succeed);
- return this;
- }
-
- public TxCompensateAckEvent build() {
+ public TxCompensateAckSucceedEvent build() {
return txCompensatedEvent;
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 7c6eee6..a7c74e2 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent;
@@ -40,7 +42,6 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
@@ -230,7 +231,14 @@ public class SagaActor extends
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
- ).event(TxCompensatedEvent.class, SagaData.class,
+ ).event(TxCompensateAckSucceedEvent.class, SagaData.class,
+ (event, data) -> {
+ UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
+ return stay().applying(domainEvent).andThen(exec(_data -> {
+ self().tell(ComponsitedCheckEvent.builder().build(), self());
+ }));
+ }
+ ).event(TxCompensateAckFailedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
@@ -242,10 +250,13 @@ public class SagaActor extends
if (data.getTxEntities().hasCompensationSentTx() || !data.isTerminated()) {
return stay();
} else {
- SagaEndedDomain domainEvent = new SagaEndedDomain(event,
- SagaActorState.COMPENSATED);
- return goTo(SagaActorState.COMPENSATED)
- .applying(domainEvent);
+ if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
+ return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
+ } else {
+ SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
+ return goTo(SagaActorState.COMPENSATED).applying(domainEvent);
+ }
}
}
).event(SagaAbortedEvent.class, SagaData.class,
@@ -450,6 +461,9 @@ public class SagaActor extends
.compensationMethod(domainEvent.getCompensationMethod())
.payloads(domainEvent.getPayloads())
.state(domainEvent.getState())
+ .reverseRetries(domainEvent.getReverseRetries())
+ .reverseTimeout(domainEvent.getReverseTimeout())
+ .retryDelayInMilliseconds(domainEvent.getRetryDelayInMilliseconds())
.beginTime(domainEvent.getEvent().getCreateTime())
.build();
data.getTxEntities().put(txEntity.getLocalTxId(), txEntity);
@@ -471,11 +485,15 @@ public class SagaActor extends
compensation(v, data);
}
});
- } else if (domainEvent.getState() == TxState.COMPENSATED) {
+ } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED || domainEvent.getState() == TxState.COMPENSATED_FAILED) {
// decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(domainEvent.getState());
- LOG.info("compensation is completed {}", txEntity.getLocalTxId());
+ if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
+ LOG.info("compensation is succeed {}", txEntity.getLocalTxId());
+ } else {
+ LOG.info("compensation is failed {}", txEntity.getLocalTxId());
+ }
}
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
@@ -548,22 +566,20 @@ public class SagaActor extends
if (txEntity.getReverseRetries() > 0) {
// which means the retry number
if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
+ LOG.info("Retry compensate {}/{} after {} ms", txEntity.getRetriesCounter().get() + 1, txEntity.getReverseRetries(),
+ txEntity.getRetryDelayInMilliseconds());
try {
Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
compensation(txEntity, data);
+ } else {
+ data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
}
- } else if (txEntity.getReverseRetries() == -1) {
- // which means retry it until succeed
- try {
- Thread.sleep(txEntity.getRetryDelayInMilliseconds());
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- compensation(txEntity, data);
+ } else {
+ data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
}
}
}
-}
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
index f74d3e6..79d5dc0 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java
@@ -19,7 +19,8 @@ package org.apache.servicecomb.pack.alpha.fsm.domain;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
@@ -45,13 +46,19 @@ public class UpdateTxEventDomain implements DomainEvent {
this.state = TxState.FAILED;
}
- public UpdateTxEventDomain(TxCompensatedEvent event) {
+ public UpdateTxEventDomain(TxCompensateAckSucceedEvent event) {
this.event = event;
this.parentTxId = event.getParentTxId();
this.localTxId = event.getLocalTxId();
- this.state = TxState.COMPENSATED;
+ this.state = TxState.COMPENSATED_SUCCEED;
}
+ public UpdateTxEventDomain(TxCompensateAckFailedEvent event) {
+ this.event = event;
+ this.parentTxId = event.getParentTxId();
+ this.localTxId = event.getLocalTxId();
+ this.state = TxState.COMPENSATED_FAILED;
+ }
public String getParentTxId() {
return parentTxId;
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 59a7d42..9dcfbbe 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -328,7 +328,7 @@ public class SagaActorTest {
* 3. TxEndedEvent-11
* 4. TxStartedEvent-12
* 5. TxAbortedEvent-12
- * 6. TxCompensatedEvent-11
+ * 6. TxCompensateAckSucceedEvent-11
* 7. SagaAbortedEvent-1
*/
@Test
@@ -375,7 +375,7 @@ public class SagaActorTest {
SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 2);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
@@ -385,6 +385,59 @@ public class SagaActorTest {
}};
}
+ @Test
+ public void middleTxAbortedAndRetryCompensationEvents() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()));
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ List<BaseEvent> eventList = SagaEventSender.middleTxAbortedAndRetryCompensationEvents(globalTxId, localTxId_1, localTxId_2);
+ eventList.stream().forEach( event -> {
+ saga.tell(event, getRef());
+ });
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDLE, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDLE, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntities().size(), 2);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.FAILED);
+
+ assertThat(eventList, is(sagaData.getEvents()));
+
+ system.stop(saga);
+ }};
+ }
+
/**
* 1. SagaStartedEvent-1
* 2. TxStartedEvent-11
@@ -393,8 +446,8 @@ public class SagaActorTest {
* 5. TxEndedEvent-12
* 6. TxStartedEvent-13
* 7. TxAbortedEvent-13
- * 8. TxCompensatedEvent-11
- * 9. TxCompensatedEvent-12
+ * 8. TxCompensateAckSucceedEvent-11
+ * 9. TxCompensateAckSucceedEvent-12
* 10. SagaAbortedEvent-1
*/
@Test
@@ -448,8 +501,8 @@ public class SagaActorTest {
SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
@@ -466,8 +519,8 @@ public class SagaActorTest {
* 5. TxEndedEvent-12
* 6. TxStartedEvent-13
* 7. TxAbortedEvent-13
- * 8. TxCompensatedEvent-11
- * 9. TxCompensatedEvent-12
+ * 8. TxCompensateAckSucceedEvent-11
+ * 9. TxCompensateAckSucceedEvent-12
* 10. SagaAbortedEvent-1
*/
@Test
@@ -521,8 +574,8 @@ public class SagaActorTest {
SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
@@ -540,8 +593,8 @@ public class SagaActorTest {
* 5. TxEndedEvent-12
* 6. TxStartedEvent-13
* 7. TxEndedEvent-13
- * 8. TxCompensatedEvent-12
- * 9. TxCompensatedEvent-13
+ * 8. TxCompensateAckSucceedEvent-12
+ * 9. TxCompensateAckSucceedEvent-13
* 10. SagaAbortedEvent-1
*/
@Test
@@ -584,8 +637,8 @@ public class SagaActorTest {
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 3);
assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.FAILED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
@@ -603,9 +656,9 @@ public class SagaActorTest {
* 6. TxStartedEvent-13
* 7. TxEndedEvent-13
* 8. SagaAbortedEvent-1
- * 9. TxCompensatedEvent-11
- * 8. TxCompensatedEvent-12
- * 9. TxCompensatedEvent-13
+ * 9. TxCompensateAckSucceedEvent-11
+ * 8. TxCompensateAckSucceedEvent-12
+ * 9. TxCompensateAckSucceedEvent-13
*/
@Test
public void sagaAbortedEventAfterAllTxEndedTest() {
@@ -661,9 +714,9 @@ public class SagaActorTest {
SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
@@ -937,8 +990,8 @@ public class SagaActorTest {
* 3. TxEndedEvent-11
* 5. TxEndedEvent-12
* 7. TxAbortedEvent-13
- * 8. TxCompensatedEvent-11
- * 9. TxCompensatedEvent-12
+ * 8. TxCompensateAckSucceedEvent-11
+ * 9. TxCompensateAckSucceedEvent-12
* 10. SagaAbortedEvent-1
*/
@Test
@@ -982,8 +1035,8 @@ public class SagaActorTest {
SagaData sagaData = SAGA_DATA_EXTENSION_PROVIDER.get(system).getLastSagaData();
assertEquals(sagaData.getGlobalTxId(), globalTxId);
assertEquals(sagaData.getTxEntities().size(), 3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(), TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(), TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(), TxState.FAILED);
assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
assertThat(eventList, is(sagaData.getEvents()));
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
index b110a84..1f0d792 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java
@@ -24,7 +24,9 @@ import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
@@ -79,7 +81,7 @@ public class SagaEventSender {
* 3. TxEndedEvent-11
* 4. TxStartedEvent-12
* 5. TxAbortedEvent-12
- * 6. TxCompensatedEvent-11
+ * 6. TxCompensateAckSucceedEvent-11
* 7. SagaAbortedEvent-1
*/
public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){
@@ -89,7 +91,7 @@ public class SagaEventSender {
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
return sagaEvents;
}
@@ -102,8 +104,8 @@ public class SagaEventSender {
* 5. TxEndedEvent-12
* 6. TxStartedEvent-13
* 7. TxAbortedEvent-13
- * 8. TxCompensatedEvent-11
- * 9. TxCompensatedEvent-12
+ * 8. TxCompensateAckSucceedEvent-11
+ * 9. TxCompensateAckSucceedEvent-12
* 10. SagaAbortedEvent-1
*/
public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -115,8 +117,33 @@ public class SagaEventSender {
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+ return sagaEvents;
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxEndedEvent-11
+ * 4. TxStartedEvent-12
+ * 5. TxAbortedEvent-12
+ * 6. TxCompensateAckFailedEvent-11
+ * 7. TxCompensateAckFailedEvent-11
+ * 8. TxCompensateAckFailedEvent-11
+ * 9. SagaAbortedEvent-1
+ */
+ public static List<BaseEvent> middleTxAbortedAndRetryCompensationEvents(String globalTxId, String localTxId_1, String localTxId_2){
+ List<BaseEvent> sagaEvents = new ArrayList<>();
+ sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+ sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).reverseRetries(3).build());
+ sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckFailedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckFailedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
return sagaEvents;
}
@@ -130,8 +157,8 @@ public class SagaEventSender {
* 6. TxStartedEvent-13
* 7. TxAbortedEvent-13
* 8. SagaAbortedEvent-1
- * 9. TxCompensatedEvent-11
- * 10. TxCompensatedEvent-12
+ * 9. TxCompensateAckSucceedEvent-11
+ * 10. TxCompensateAckSucceedEvent-12
*/
public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -143,8 +170,8 @@ public class SagaEventSender {
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
return sagaEvents;
}
@@ -156,8 +183,8 @@ public class SagaEventSender {
* 5. TxEndedEvent-12
* 6. TxStartedEvent-13
* 7. TxEndedEvent-13
- * 8. TxCompensatedEvent-12
- * 9. TxCompensatedEvent-13
+ * 8. TxCompensateAckSucceedEvent-12
+ * 9. TxCompensateAckSucceedEvent-13
* 10. SagaAbortedEvent-1
*/
public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -169,8 +196,8 @@ public class SagaEventSender {
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
return sagaEvents;
}
@@ -184,9 +211,9 @@ public class SagaEventSender {
* 6. TxStartedEvent-13
* 7. TxEndedEvent-13
* 8. SagaAbortedEvent-1
- * 9. TxCompensatedEvent-11
- * 8. TxCompensatedEvent-12
- * 9. TxCompensatedEvent-13
+ * 9. TxCompensateAckSucceedEvent-11
+ * 8. TxCompensateAckSucceedEvent-12
+ * 9. TxCompensateAckSucceedEvent-13
*/
public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
List<BaseEvent> sagaEvents = new ArrayList<>();
@@ -198,9 +225,9 @@ public class SagaEventSender {
sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
return sagaEvents;
}
@@ -302,8 +329,8 @@ public class SagaEventSender {
* 3. TxEndedEvent-11
* 5. TxEndedEvent-12
* 7. TxAbortedEvent-13
- * 8. TxCompensatedEvent-11
- * 9. TxCompensatedEvent-12
+ * 8. TxCompensateAckSucceedEvent-11
+ * 9. TxCompensateAckSucceedEvent-12
* 10. SagaAbortedEvent-1
*/
public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
@@ -315,8 +342,8 @@ public class SagaEventSender {
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
- sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+ sagaEvents.add(TxCompensateAckSucceedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
return sagaEvents;
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index bfe4037..f7b927e 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -137,7 +137,7 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),2);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.FAILED);
}
@@ -158,8 +158,8 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
}
@@ -180,8 +180,8 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
}
@@ -203,8 +203,8 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),3);
assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
}
@Test
@@ -224,9 +224,9 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
}
@Test
@@ -335,8 +335,8 @@ public class SagaIntegrationTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertEquals(sagaData.getTxEntities().size(),3);
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 49b5e4e..e4d25b9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -26,7 +26,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.fsm.CompensateAckType;
-import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.common.EventType;
@@ -141,6 +142,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
.forwardTimeout(message.getForwardTimeout())
.reverseRetries(message.getReverseRetries())
.reverseTimeout(message.getReverseTimeout())
+ .retryDelayInMilliseconds(message.getRetryDelayInMilliseconds())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.TxEndedEvent.name())) {
@@ -168,8 +170,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
.createTime(new Date())
.localTxId(message.getLocalTxId()).build();
} else if (message.getType().equals(EventType.TxCompensateAckSucceedEvent.name())) {
- event = TxCompensateAckEvent.builder()
- .succeed(true)
+ event = TxCompensateAckSucceedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
@@ -179,8 +180,8 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
.getAck(CompensateAckType.Succeed);
} else if (message.getType().equals(EventType.TxCompensateAckFailedEvent.name())) {
- event = TxCompensateAckEvent.builder()
- .succeed(false)
+ event = TxCompensateAckFailedEvent.builder()
+ .payloads(message.getPayloads().toByteArray())
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 3db7943..7dd2600 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -179,7 +179,7 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.FAILED);
}
@@ -204,8 +204,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
}
@@ -232,8 +232,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
}
@Test
@@ -265,8 +265,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.FAILED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
}
@Test
@@ -290,9 +290,9 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.COMPENSATED_SUCCEED);
}
@Test
@@ -412,8 +412,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
}
@@ -438,8 +438,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
}
@@ -478,8 +478,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
}
@@ -505,8 +505,8 @@ public class AlphaIntegrationFsmTest {
assertNotNull(sagaData.getBeginTime());
assertNotNull(sagaData.getEndTime());
assertTrue(sagaData.getEndTime().getTime() > sagaData.getBeginTime().getTime());
- assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED);
- assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_1).getState(),TxState.COMPENSATED_SUCCEED);
+ assertEquals(sagaData.getTxEntities().get(localTxId_2).getState(),TxState.COMPENSATED_SUCCEED);
assertEquals(sagaData.getTxEntities().get(localTxId_3).getState(),TxState.FAILED);
assertArrayEquals(sagaData.getTxEntities().get(localTxId_3).getThrowablePayLoads(),NullPointerException.class.getName().getBytes());
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index d628832..20d0b49 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -111,10 +111,12 @@ public class GrpcSagaClientMessageSender implements SagaMessageSender {
.setType(event.type().name())
.setTimeout(event.timeout())
.setForwardTimeout(event.forwardTimeout())
+ .setReverseTimeout(event.reverseTimeout())
.setCompensationMethod(event.compensationMethod())
.setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
.setForwardRetries(event.forwardRetries())
.setReverseRetries(event.reverseRetries())
+ .setRetryDelayInMilliseconds(event.retryDelayInMilliseconds())
.setPayloads(payloads);
return builder.build();
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
index 94a7111..6a3bcb1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java
@@ -43,7 +43,7 @@ public class CallbackContext {
contexts.put(key, new CallbackContextInternal(target, compensationMethod));
}
- public void apply(String globalTxId, String localTxId, String callbackMethod, Object... payloads) {
+ public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
CallbackContextInternal contextInternal = contexts.get(callbackMethod);
String oldGlobalTxId = omegaContext.globalTxId();
String oldLocalTxId = omegaContext.localTxId();
@@ -54,14 +54,14 @@ public class CallbackContext {
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
sender.send(
new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
- omegaContext.globalTxId()));
+ parentTxId, callbackMethod));
}
LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
} catch (IllegalAccessException | InvocationTargetException e) {
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
sender.send(
new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
- omegaContext.globalTxId()));
+ parentTxId, callbackMethod, e));
}
LOG.error(
"Pre-checking for callback method " + contextInternal.callbackMethod.toString()
@@ -73,6 +73,10 @@ public class CallbackContext {
}
}
+ public OmegaContext getOmegaContext() {
+ return omegaContext;
+ }
+
private static final class CallbackContextInternal {
private final Object target;
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
index 8d5d0f8..93d2ebd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandler.java
@@ -30,7 +30,9 @@ public class CompensationMessageHandler implements MessageHandler {
@Override
public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
Object... payloads) {
- context.apply(globalTxId, localTxId, compensationMethod, payloads);
- sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+ context.apply(globalTxId, localTxId, parentTxId, compensationMethod, payloads);
+ if (!context.getOmegaContext().getAlphaMetas().isAkkaEnabled()) {
+ sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
+ }
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
index db009c2..0a41369 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
import org.apache.servicecomb.pack.common.EventType;
public class TxCompensateAckFailedEvent extends TxEvent {
- public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0,0 ,0 ,0, 0);
+ public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+ super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,0 ,0 ,0, 0, stackTrace(throwable));
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
index 32870f7..0f9f972 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
import org.apache.servicecomb.pack.common.EventType;
public class TxCompensateAckSucceedEvent extends TxEvent {
- public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0, 0, 0, 0, 0);
+ public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+ super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0, 0);
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
index ab188cd..a24a018 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -45,7 +45,7 @@ public class CoordinateMessageHandler implements TccMessageHandler {
public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) {
// TODO need to catch the exception and send the failed message
// The parameter need to be updated here
- callbackContext.apply(globalTxId, localTxId, methodName, parametersContext.getParameters(localTxId));
+ callbackContext.apply(globalTxId, localTxId, parentTxId, methodName, parametersContext.getParameters(localTxId));
tccMessageSender.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed));
// Need to remove the parameter
parametersContext.removeParameter(localTxId);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
index 8b6353e..d46e1b8 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensationMessageHandlerTest.java
@@ -18,16 +18,23 @@
package org.apache.servicecomb.pack.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
+import org.apache.servicecomb.pack.omega.context.AlphaMetas;
+import org.apache.servicecomb.pack.omega.context.IdGenerator;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.UniqueIdGenerator;
import org.junit.Before;
import org.junit.Test;
@@ -73,10 +80,6 @@ public class CompensationMessageHandlerTest {
private final String compensationMethod = getClass().getCanonicalName();
private final String payload = uniquify("blah");
- private final CallbackContext context = mock(CallbackContext.class);
-
- private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
-
@Before
public void setUp() {
events.clear();
@@ -84,10 +87,13 @@ public class CompensationMessageHandlerTest {
@Test
public void sendsCompensatedEventOnCompensationCompleted() {
+ final CallbackContext context = mock(CallbackContext.class);
+ final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+ IdGenerator<String> idGenerator = new UniqueIdGenerator();
+ OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(false).build());
+ when(context.getOmegaContext()).thenReturn(omegaContext);
handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
-
assertThat(events.size(), is(1));
-
TxEvent event = events.get(0);
assertThat(event.globalTxId(), is(globalTxId));
assertThat(event.localTxId(), is(localTxId));
@@ -95,7 +101,53 @@ public class CompensationMessageHandlerTest {
assertThat(event.type(), is(EventType.TxCompensatedEvent));
assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
assertThat(event.payloads().length, is(0));
+ verify(context).apply(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+ }
+
+ @Test
+ public void sendsCompensateAckSucceedEventOnCompensationCompletedWithFSM() throws NoSuchMethodException {
+ IdGenerator<String> idGenerator = new UniqueIdGenerator();
+ OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(true).build());
+ CallbackContext context = new CallbackContext(omegaContext, sender);
+ Method mockMethod = this.getClass().getMethod("mockCompensationSucceedMethod",String.class);
+ context.addCallbackContext(compensationMethod, mockMethod, this);
+ CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+ handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+ assertThat(events.size(), is(1));
+ TxEvent event = events.get(0);
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(localTxId));
+ assertThat(event.parentTxId(), is(parentTxId));
+ assertThat(event.type(), is(EventType.TxCompensateAckSucceedEvent));
+ assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+ assertThat(event.payloads().length, is(0));
+ }
+
+ @Test
+ public void sendsCompensateAckFailedEventOnCompensationFailedWithFSM() throws NoSuchMethodException {
+ IdGenerator<String> idGenerator = new UniqueIdGenerator();
+ OmegaContext omegaContext = new OmegaContext(idGenerator,AlphaMetas.builder().akkaEnabled(true).build());
+ CallbackContext context = new CallbackContext(omegaContext, sender);
+ Method mockMethod = this.getClass().getMethod("mockCompensationFailedMethod",String.class);
+ context.addCallbackContext(compensationMethod, mockMethod, this);
+ CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+ handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
+ assertThat(events.size(), is(1));
+ TxEvent event = events.get(0);
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(localTxId));
+ assertThat(event.parentTxId(), is(parentTxId));
+ assertThat(event.type(), is(EventType.TxCompensateAckFailedEvent));
+ assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
+ assertThat(event.payloads().length, greaterThan(0));
+ }
+
+ public void mockCompensationSucceedMethod(String payloads){
+ // mock compensation method
+ }
- verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
+ public void mockCompensationFailedMethod(String payloads){
+ // mock compensation method
+ throw new RuntimeException("mock compensation failed");
}
}