You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:21 UTC
[incubator-servicecomb-saga] 09/12: SCB-218 made sure saga ended
event is always the last event
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit ffaa350c987ec77c43209cf990d6f9e2bcab2560
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 17:01:45 2018 +0800
SCB-218 made sure saga ended event is always the last event
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/EventScanner.java | 3 +--
.../servicecomb/saga/alpha/server/AlphaConfig.java | 2 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 29 +++++++++++++++++++++-
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index f9fa3be..80deeb3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -100,8 +100,6 @@ public class EventScanner implements Runnable {
}
}
- // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
- // unless we ask user to specify a name for each participant in the global TX in @Compensable
private void updateCompensationStatus(TxEvent event) {
commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
log.info("Transaction with globalTxId {} and localTxId {} was compensated",
@@ -135,6 +133,7 @@ public class EventScanner implements Runnable {
EMPTY_PAYLOAD);
}
+ // TODO: 2018/1/19 potentially compensation may be out of order if we don't wait till received compensated event for the previous one, since compensation is async
private void compensate() {
commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 769ee5a..35352f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -69,7 +69,7 @@ class AlphaConfig {
@Bean
ScheduledExecutorService compensationScheduler() {
- return Executors.newScheduledThreadPool(2);
+ return Executors.newScheduledThreadPool(1);
}
@Bean
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a928443..5dddc1d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -30,15 +31,18 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -345,6 +349,23 @@ public class AlphaIntegrationTest {
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
}
+ @Test
+ public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+ String anotherLocalTxId = UUID.randomUUID().toString();
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId));
+ blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId));
+
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
+
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 8);
+ List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+ assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+ }
+
private GrpcAck onCompensation(GrpcCompensateCommand command) {
return blockingStub.onTxEvent(
eventOf(TxCompensatedEvent,
@@ -445,6 +466,12 @@ public class AlphaIntegrationTest {
@PostConstruct
void init() {
-// new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+ // simulates concurrent db connections
+ new EventScanner(
+ Executors.newSingleThreadScheduledExecutor(),
+ eventRepository,
+ commandRepository,
+ omegaCallback,
+ 1).run();
}
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.