You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/03 13:40:04 UTC
[servicecomb-pack] 02/02: SCB-1321 Add test case
receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit df20c2187c2e1665b92ec82e76bb60062c7a442c
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Wed Jul 3 21:39:43 2019 +0800
SCB-1321 Add test case receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest
---
.../servicecomb/pack/alpha/fsm/SagaActor.java | 10 +++---
.../fsm/event/consumer/SagaEventConsumer.java | 6 ++--
.../alpha/server/fsm/GrpcSagaEventService.java | 4 ++-
.../alpha/server/fsm/AlphaIntegrationFsmTest.java | 37 ++++++++++++++++++++--
.../alpha/server/fsm/OmegaEventSagaSimulator.java | 1 -
5 files changed, 47 insertions(+), 11 deletions(-)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
index 51d6c52..5104aca 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java
@@ -256,7 +256,8 @@ public class SagaActor extends
}
).event(TxComponsitedCheckInternalEvent.class, SagaData.class,
(event, data) -> {
- if (hasCompensationSentTx(data)) {
+ if (hasCompensationSentTx(data) || !data.isTerminated()) {
+ //if (hasCompensationSentTx(data)) {
return stay().replying(data);
} else {
return goTo(SagaActorState.COMPENSATED)
@@ -266,7 +267,7 @@ public class SagaActor extends
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
- //data.setTerminated(true);
+ data.setTerminated(true);
if (hasCommittedTx(data)) {
SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED);
return stay().replying(data).applying(domainEvent);
@@ -409,8 +410,8 @@ public class SagaActor extends
txEntity.setEndTime(System.currentTimeMillis());
if (domainEvent.getState() == TxState.COMMITTED) {
// stop
- data.setEndTime(System.currentTimeMillis());
- data.setTerminated(true);
+ //data.setEndTime(System.currentTimeMillis());
+ //data.setTerminated(true);
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
txEntity.setState(domainEvent.getState());
@@ -430,6 +431,7 @@ public class SagaActor extends
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
if (domainEvent.getState() == SagaActorState.FAILED) {
+ data.setTerminated(true);
data.getTxEntityMap().forEach((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
index 7ddbc1a..fa481f8 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java
@@ -58,10 +58,10 @@ public class SagaEventConsumer {
saga = optional.get();
}
saga.tell(event, ActorRef.noSender());
- LOG.info("tell {} to {}", event.toString(),saga);
- //TODO WAL commit
+ if(LOG.isDebugEnabled()){
+ LOG.debug("tell {} to {}", event.toString(),saga);
+ }
}catch (Exception ex){
- //TODO
throw ex;
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 2394ab2..72a1dfa 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -78,7 +78,9 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
@Override
@Trace("onTransactionEvent")
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
- LOG.info("onText {}",message);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("onText {}",message);
+ }
boolean ok = true;
BaseEvent event = null;
if (message.getType().equals(EventType.SagaStartedEvent.name())) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index 27f5fd3..3bbaf6d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -31,6 +31,7 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
import org.apache.servicecomb.pack.alpha.server.AlphaApplication;
import org.apache.servicecomb.pack.alpha.server.AlphaConfig;
+import org.apache.servicecomb.pack.common.EventType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -50,7 +51,7 @@ import org.springframework.test.context.junit4.SpringRunner;
"alpha.event.pollingInterval=1",
"spring.main.allow-bean-definition-overriding=true",
"alpha.model.actor.enabled=true",
- "spring.profiles.active=akka-persistence-redis"
+ "spring.profiles.active=akka-persistence-mem"
})
public class AlphaIntegrationFsmTest {
private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build();
@@ -187,7 +188,39 @@ public class AlphaIntegrationFsmTest {
omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
omegaEventSender.getBlockingStub().onTxEvent(event);
});
- await().atMost(2, SECONDS).until(() -> {
+ await().atMost(5, SECONDS).until(() -> {
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
+ });
+ SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
+ Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().size(),3);
+ Assert.assertTrue(sagaData.getBeginTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > 0);
+ Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime());
+ Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED);
+ Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED);
+ Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED);
+ }
+
+ @Test
+ public void receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest() {
+ omegaEventSender.onConnected();
+ final String globalTxId = UUID.randomUUID().toString();
+ final String localTxId_1 = UUID.randomUUID().toString();
+ final String localTxId_2 = UUID.randomUUID().toString();
+ final String localTxId_3 = UUID.randomUUID().toString();
+ omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
+ if(event.getType().equals(EventType.TxStartedEvent.name()) && event.getLocalTxId().equals(localTxId_3)){
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ omegaEventSender.getBlockingStub().onTxEvent(event);
+ });
+ await().atMost(5, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED;
});
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 52bcba5..433f54a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -262,7 +262,6 @@ public class OmegaEventSagaSimulator {
.build();
}
-
public static Builder builder() {
return new Builder();
}