You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/06/21 10:34:19 UTC
[servicecomb-pack] 03/05: SCB-1321 Sub-transaction support
concurrent
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 11936fdc9a71224b9b7d0e5a2d55192cf735bc10
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Fri Jun 21 18:25:18 2019 +0800
SCB-1321 Sub-transaction support concurrent
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 39 +++-
.../servicecomb/pack/alpha/fsm/SagaActorTest.java | 204 ++++++++++++++++++++-
alpha/pom.xml | 3 +-
docs/fsm/assets/saga_state_diagram.png | Bin 229442 -> 237463 bytes
docs/fsm/assets/state_table.png | Bin 286375 -> 311887 bytes
5 files changed, 234 insertions(+), 12 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 492d187..2b48f45 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -95,6 +95,16 @@ public class SagaActor extends
return goTo(SagaActorState.PARTIALLY_COMMITTED);
}
}
+ ).event(TxStartedEvent.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ if (data.getExpirationTime() > 0) {
+ return stay()
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return stay();
+ }
+ }
).event(SagaTimeoutEvent.class,
(event, data) -> {
return goTo(SagaActorState.SUSPENDED)
@@ -123,6 +133,16 @@ public class SagaActor extends
return goTo(SagaActorState.PARTIALLY_ACTIVE);
}
}
+ ).event(TxEndedEvent.class,
+ (event, data) -> {
+ updateTxEntity(event, data);
+ if (data.getExpirationTime() > 0) {
+ return stay()
+ .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
+ } else {
+ return stay();
+ }
+ }
).event(SagaTimeoutEvent.class,
(event, data) -> {
return goTo(SagaActorState.SUSPENDED)
@@ -196,8 +216,8 @@ public class SagaActor extends
).event(TxEndedEvent.class, SagaData.class,
(event, data) -> {
updateTxEntity(event, data);
- // TODO 调用补偿方法
TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId());
+ // TODO call compensate
compensation(txEntity, data);
return stay();
}
@@ -233,8 +253,8 @@ public class SagaActor extends
whenUnhandled(
matchAnyEvent((event, data) -> {
- LOG.error("unmatch event {}", event);
- return stay();
+ LOG.error("Unhandled event {}", event);
+ return goTo(SagaActorState.SUSPENDED).replying(data);
})
);
@@ -290,26 +310,25 @@ public class SagaActor extends
if (txEntity.getState() == TxState.ACTIVE) {
txEntity.setEndTime(System.currentTimeMillis());
txEntity.setState(TxState.FAILED);
- // TODO 调用补偿方法
data.getTxEntityMap().forEach((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
- // TODO 调用补偿方法
+ // call compensate
compensation(v, data);
}
});
}
} else if (event instanceof TxComponsitedEvent) {
- //补偿中计数器减一
+ // decrement the compensation running counter by one
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(TxState.COMPENSATED);
- LOG.info("完成补偿 {}",txEntity.getLocalTxId());
+ LOG.info("compensation is completed {}",txEntity.getLocalTxId());
}
}
} else if (event instanceof SagaEvent) {
if (event instanceof SagaAbortedEvent) {
data.getTxEntityMap().forEach((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
- // TODO 调用补偿方法
+ // call compensate
compensation(v, data);
}
});
@@ -324,8 +343,8 @@ public class SagaActor extends
}
private void compensation(TxEntity txEntity, SagaData data) {
- //补偿中计数器加一
+ // increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
- LOG.info("调用补偿方法 {}", txEntity.getLocalTxId());
+ LOG.info("compensate {}", txEntity.getLocalTxId());
}
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 156d67e..847d144 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -8,6 +8,7 @@ import akka.actor.Terminated;
import akka.persistence.fsm.PersistentFSM;
import akka.persistence.fsm.PersistentFSM.CurrentState;
import akka.testkit.javadsl.TestKit;
+import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.UUID;
import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent;
@@ -22,9 +23,13 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SagaActorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
static ActorSystem system;
@BeforeClass
@@ -320,7 +325,7 @@ public class SagaActorTest {
* 10. SagaAbortedEvent-1
*/
@Test
- public void receivedRemainingEventAfterfirstTxAbortedEventTest() {
+ public void receivedRemainingEventAfterFirstTxAbortedEventTest() {
new TestKit(system) {{
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
@@ -609,6 +614,203 @@ public class SagaActorTest {
}};
}
+ // tx concurrent execution
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxStartedEvent-12
+ * 4. TxStartedEvent-13
+ * 5. TxEndedEvent-11
+ * 6. TxEndedEvent-12
+ * 7. TxEndedEvent-13
+ * 8. SagaEndedEvent-1
+ */
+ @Test
+ public void successfulTestWithTxConcurrent() throws InterruptedException {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 3. TxStartedEvent-12
+ * 5. TxEndedEvent-11
+ * 4. TxStartedEvent-13
+ * 6. TxEndedEvent-12
+ * 7. TxEndedEvent-13
+ * 8. SagaEndedEvent-1
+ */
+ @Test
+ public void successfulTestWithTxConcurrentCross() throws InterruptedException {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ sagaData.getTxEntityMap().forEach((k, v) -> {
+ assertEquals(v.getState(), TxState.COMMITTED);
+ });
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
+ /**
+ * 1. SagaStartedEvent-1
+ * 2. TxStartedEvent-11
+ * 4. TxStartedEvent-12
+ * 6. TxStartedEvent-13
+ * 3. TxEndedEvent-11
+ * 5. TxEndedEvent-12
+ * 7. TxAbortedEvent-13
+ * 8. TxComponsitedEvent-11
+ * 9. TxComponsitedEvent-12
+ * 10. SagaAbortedEvent-1
+ */
+ @Test
+ public void lastTxAbortedEventWithTxConcurrentTest() {
+ new TestKit(system) {{
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+
+ ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga");
+ watch(saga);
+ saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef());
+
+ saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef());
+ saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef());
+ saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef());
+
+ //expect
+ CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class);
+ assertEquals(SagaActorState.IDEL, currentState.state());
+
+ PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED);
+
+ SagaData sagaData = expectMsgClass(SagaData.class);
+ assertEquals(sagaData.getGlobalTxId(), globalTxId);
+ assertEquals(sagaData.getTxEntityMap().size(), 3);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED);
+ assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED);
+ assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0);
+
+ transition = expectMsgClass(PersistentFSM.Transition.class);
+ assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED);
+
+ Terminated terminated = expectMsgClass(Terminated.class);
+ assertEquals(terminated.getActor(), saga);
+
+ system.stop(saga);
+ }};
+ }
+
private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef,
SagaActorState from, SagaActorState to) {
assertEquals(transition.fsmRef(), actorRef);
diff --git a/alpha/pom.xml b/alpha/pom.xml
index 6921f5b..3dfaf13 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -32,11 +32,12 @@
<packaging>pom</packaging>
<modules>
<module>alpha-core</module>
+ <module>alpha-fsm</module>
<module>alpha-spring-boot-compatibility</module>
<module>alpha-spring-cloud-starter-eureka</module>
<module>alpha-spring-cloud-starter-consul</module>
<module>alpha-spring-cloud-starter-zookeeper</module>
- <module>alpha-server</module>
+ <module>alpha-server</module>
</modules>
<build>
diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png
index 96cd6c9..349fb72 100644
Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ
diff --git a/docs/fsm/assets/state_table.png b/docs/fsm/assets/state_table.png
index 5005597..569b2ce 100644
Binary files a/docs/fsm/assets/state_table.png and b/docs/fsm/assets/state_table.png differ