You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 06:56:05 UTC
[incubator-servicecomb-saga] 01/04: SCB-213 proper timing to handle
SagaEndedEvent in asynchronous situation
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 1f5a128cccc13d92208be6868a379fa5b3eb3ce2
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 10 16:00:45 2018 +0800
SCB-213 proper timing to handle SagaEndedEvent in asynchronous situation
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 45 ++++++++++++++++++++--
.../saga/alpha/core/TxConsistentServiceTest.java | 35 ++++++++++++-----
.../saga/omega/transaction/TransactionAspect.java | 12 +++---
3 files changed, 74 insertions(+), 18 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 7e7839f..1805335 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,22 +17,36 @@
package org.apache.servicecomb.saga.alpha.core;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
public class TxConsistentService {
private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
+ private static final byte[] EMPTY_PAYLOAD = new byte[0];
+
private final TxEventRepository eventRepository;
private final OmegaCallback omegaCallback;
private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
- put(EventType.TxStartedEvent.name(), DO_NOTHING_CONSUMER);
- put(EventType.TxAbortedEvent.name(), (event) -> compensate(event));
+ put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+ put(TxAbortedEvent.name(), (event) -> compensate(event));
+ put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
}};
+ private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+
public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
this.eventRepository = eventRepository;
this.omegaCallback = omegaCallback;
@@ -44,7 +58,32 @@ public class TxConsistentService {
}
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), EventType.TxStartedEvent.name());
+ List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
events.forEach(omegaCallback::compensate);
+ eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
+ CopyOnWriteArraySet<String> eventSet = new CopyOnWriteArraySet<>();
+ events.forEach(e -> eventSet.add(getUniqueEventId(e)));
+ return eventSet;
+ });
+ }
+
+ private void updateCompensateStatus(TxEvent event) {
+ Set<String> events = eventsToCompensate.get(event.globalTxId());
+ if (events != null) {
+ events.remove(getUniqueEventId(event));
+ if (events.isEmpty()) {
+ markGlobalTxEnd(event);
+ }
+ }
+ }
+
+ private String getUniqueEventId(TxEvent event) {
+ return event.globalTxId() + "_" + event.localTxId();
+ }
+
+ private void markGlobalTxEnd(TxEvent event) {
+ eventRepository.save(new TxEvent(
+ event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
+ null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
}
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 6443997..ad448da 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -68,11 +74,11 @@ public class TxConsistentServiceTest {
@Test
public void persistEventOnArrival() throws Exception {
TxEvent[] events = {
- newEvent(EventType.SagaStartedEvent),
- newEvent(EventType.TxStartedEvent),
- newEvent(EventType.TxEndedEvent),
- newEvent(EventType.TxCompensatedEvent),
- newEvent(EventType.SagaEndedEvent)};
+ newEvent(SagaStartedEvent),
+ newEvent(TxStartedEvent),
+ newEvent(TxEndedEvent),
+ newEvent(TxCompensatedEvent),
+ newEvent(SagaEndedEvent)};
for (TxEvent event : events) {
consistentService.handle(event);
@@ -85,14 +91,14 @@ public class TxConsistentServiceTest {
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
String localTxId1 = UUID.randomUUID().toString();
- events.add(eventOf(EventType.TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
- events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId1, "method a"));
+ events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
String localTxId2 = UUID.randomUUID().toString();
- events.add(eventOf(EventType.TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
- events.add(eventOf(EventType.TxEndedEvent, new byte[0], localTxId2, "method b"));
+ events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
- TxEvent abortEvent = newEvent(EventType.TxAbortedEvent);
+ TxEvent abortEvent = newEvent(TxAbortedEvent);
consistentService.handle(abortEvent);
@@ -101,6 +107,15 @@ public class TxConsistentServiceTest {
new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
));
+
+ TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
+ consistentService.handle(compensateEvent2);
+
+ TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
+ consistentService.handle(compensateEvent1);
+
+ assertThat(events.size(), is(8));
+ assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
}
private TxEvent newEvent(EventType eventType) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index bf13e3a..abafc2f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -75,13 +75,15 @@ public class TransactionAspect {
sagaStartAnnotationProcessor.preIntercept();
try {
- return joinPoint.proceed();
+ Object result = joinPoint.proceed();
+
+ LOG.info("Transaction {} succeeded.", context.globalTxId());
+ sagaStartAnnotationProcessor.postIntercept();
+
+ return result;
} catch (Throwable throwable) {
- LOG.error("Failed to process SagaStart method: {}", method.toString());
+ LOG.error("Transaction {} failed.", context.globalTxId());
throw throwable;
- } finally {
- LOG.debug("Transaction {} has finished.", context.globalTxId());
- sagaStartAnnotationProcessor.postIntercept();
}
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.