You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/14 08:56:50 UTC
[incubator-servicecomb-saga] 01/02: SCB-220 compensated only ended
events
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 967b0a5b215a0c9147db109f420a2605595b637c
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 13 09:47:39 2018 +0800
SCB-220 compensated only ended events
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 46 +++++++++-------
.../servicecomb/saga/alpha/core/TxEvent.java | 12 +++++
.../saga/alpha/core/TxEventRepository.java | 4 ++
.../saga/alpha/core/TxConsistentServiceTest.java | 39 ++++++++++++--
.../saga/alpha/server/SpringTxEventRepository.java | 16 ++++--
.../alpha/server/TxEventEnvelopeRepository.java | 24 +++++++--
.../saga/alpha/server/AlphaIntegrationTest.java | 62 ++++++++++++++++++----
7 files changed, 164 insertions(+), 39 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 55de6b7..9a3a314 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.saga.alpha.core;
+import static java.util.Collections.emptySet;
import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
@@ -25,11 +26,10 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@@ -42,11 +42,12 @@ public class TxConsistentService {
private final TxEventRepository eventRepository;
private final OmegaCallback omegaCallback;
private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+ put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
put(TxAbortedEvent.name(), (event) -> compensate(event));
put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
}};
- private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+ private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -57,25 +58,36 @@ public class TxConsistentService {
public void handle(TxEvent event) {
eventRepository.save(event);
- executor.execute(() -> {
- if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
- omegaCallback.compensate(event);
- }
+ executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+ }
+
+ private void compensateIfAlreadyAborted(TxEvent event) {
+ if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
+ eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+ TxEvent correspondingStartedEvent = eventRepository
+ .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
+
+ omegaCallback.compensate(correspondingStartedEvent);
+ }
+ }
- eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
- });
+ private boolean isCompensationScheduled(TxEvent event) {
+ return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
}
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
- eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
- Set<String> eventSet = new ConcurrentSkipListSet<>();
- events.forEach(e -> eventSet.add(e.localTxId()));
- return eventSet;
- });
+ List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
+
+ events.removeIf(this::isCompensationScheduled);
+
+ Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
+ events.forEach(e -> localTxIds.add(e.localTxId()));
+
events.forEach(omegaCallback::compensate);
}
+ // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
+ // unless we ask user to specify a name for each participant in the global TX in @Compensable
private void updateCompensateStatus(TxEvent event) {
Set<String> events = eventsToCompensate.get(event.globalTxId());
if (events != null) {
@@ -96,8 +108,4 @@ public class TxConsistentService {
private boolean isGlobalTxAborted(TxEvent event) {
return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
}
-
- private boolean isTxEndedEvent(TxEvent event) {
- return TxEndedEvent.name().equals(event.type());
- }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 9a2cea4..ebea44c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -36,6 +36,18 @@ public class TxEvent {
public TxEvent(
String serviceName,
String instanceId,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ byte[] payloads) {
+ this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ }
+
+ public TxEvent(
+ String serviceName,
+ String instanceId,
Date creationTime,
String globalTxId,
String localTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a8387b..cf5706b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,4 +23,8 @@ public interface TxEventRepository {
void save(TxEvent event);
List<TxEvent> findTransactions(String globalTxId, String type);
+
+ TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+
+ List<TxEvent> findTransactionsToCompensate(String globalTxId);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index febbfaf..c7e5391 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -55,6 +55,40 @@ public class TxConsistentServiceTest {
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
.collect(Collectors.toList());
}
+
+ @Override
+ public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+ return events.stream()
+ .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
+ .findFirst()
+ .get();
+ }
+
+ @Override
+ public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+ return events.stream()
+ .filter(event -> globalTxId.equals(event.globalTxId())
+ && event.type().equals(TxStartedEvent.name())
+ && isCompleted(globalTxId, event)
+ && !isCompensated(globalTxId, event))
+ .collect(Collectors.toList());
+ }
+
+ private boolean isCompleted(String globalTxId, TxEvent event) {
+ return events.stream()
+ .filter(e -> globalTxId.equals(e.globalTxId())
+ && e.localTxId().equals(event.localTxId())
+ && e.type().equals(TxEndedEvent.name()))
+ .count() > 0;
+ }
+
+ private boolean isCompensated(String globalTxId, TxEvent event) {
+ return events.stream()
+ .filter(e -> globalTxId.equals(e.globalTxId())
+ && e.localTxId().equals(event.localTxId())
+ && e.type().equals(TxCompensatedEvent.name()))
+ .count() > 0;
+ }
};
private final String globalTxId = UUID.randomUUID().toString();
@@ -120,17 +154,16 @@ public class TxConsistentServiceTest {
@Test
public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
- String localTxId1 = UUID.randomUUID().toString();
events.add(newEvent(TxStartedEvent));
events.add(newEvent(TxAbortedEvent));
- TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+ TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
consistentService.handle(event);
await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
assertThat(compensationContexts, containsInAnyOrder(
- new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+ new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
));
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 3bf6e03..00d10d2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
@@ -37,9 +36,16 @@ class SpringTxEventRepository implements TxEventRepository {
@Override
public List<TxEvent> findTransactions(String globalTxId, String type) {
- return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
- .stream()
- .map(TxEventEnvelope::event)
- .collect(Collectors.toList());
+ return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
+ }
+
+ @Override
+ public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+ return eventRepo.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(globalTxId, localTxId, type).event();
+ }
+
+ @Override
+ public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+ return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fe05c1e..1e1859c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,15 +19,33 @@ package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
- TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+ List<TxEventEnvelope> findByEventGlobalTxId(String globalTxId);
- @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.server.TxEventEnvelope("
+ @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+ "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+ ") FROM TxEventEnvelope t "
+ "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
- List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+ List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+
+ TxEventEnvelope findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(String globalTxId, String localTxId, String type);
+
+ @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+ + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+ + ") FROM TxEventEnvelope t "
+ + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+ + " FROM TxEventEnvelope t1 "
+ + " WHERE t1.event.globalTxId = ?1 "
+ + " AND t1.event.localTxId = t.event.localTxId "
+ + " AND t1.event.type = 'TxEndedEvent'"
+ + ") AND NOT EXISTS ( "
+ + " FROM TxEventEnvelope t2 "
+ + " WHERE t2.event.globalTxId = ?1 "
+ + " AND t2.event.localTxId = t.event.localTxId "
+ + " AND t2.event.type = 'TxCompensatedEvent')")
+ List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 9f13a8f..cbbe8ee 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
@@ -118,11 +119,11 @@ public class AlphaIntegrationTest {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
- await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+ await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
assertThat(receivedCommands.isEmpty(), is(true));
- TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
+ TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId).get(0);
assertThat(envelope.serviceName(), is(serviceName));
assertThat(envelope.instanceId(), is(instanceId));
@@ -178,6 +179,7 @@ public class AlphaIntegrationTest {
public void removeCallbackOnClientDown() throws Exception {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
omegaCallbacks.get(serviceName).get(instanceId).disconnect();
@@ -187,17 +189,34 @@ public class AlphaIntegrationTest {
}
@Test
+ public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+ blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+ blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
+ await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+ GrpcCompensateCommand command = receivedCommands.poll();
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getLocalTxId(), is(localTxId));
+ assertThat(command.getParentTxId(), is(parentTxId));
+ assertThat(command.getCompensateMethod(), is(compensationMethod));
+ assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+ }
+
+ @Test
public void doNotCompensateDuplicateTxOnFailure() {
// duplicate events with same content but different timestamp
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
- blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method a"));
+ blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
String localTxId1 = UUID.randomUUID().toString();
String parentTxId1 = UUID.randomUUID().toString();
blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
- blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method b"));
+ blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
@@ -214,7 +233,8 @@ public class AlphaIntegrationTest {
public void getCompensateCommandOnFailure() {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
- await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+ await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -231,6 +251,7 @@ public class AlphaIntegrationTest {
public void compensateOnlyFailedGlobalTransaction() {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
// simulates connection from another service with different globalTxId
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
@@ -239,7 +260,7 @@ public class AlphaIntegrationTest {
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -250,6 +271,29 @@ public class AlphaIntegrationTest {
anotherBlockingStub.onDisconnected(anotherServiceConfig);
}
+ @Test
+ public void compensateOnlyCompletedTransactions() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+ String anotherLocalTxId1 = UUID.randomUUID().toString();
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId1));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId1));
+ blockingStub.onTxEvent(someGrpcEvent(TxCompensatedEvent, globalTxId, anotherLocalTxId1));
+
+ String anotherLocalTxId2 = UUID.randomUUID().toString();
+ blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
+
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+
+ blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
+ await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+ assertThat(receivedCommands.size(), is(1));
+ assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
+ }
+
private GrpcServiceConfig someServiceConfig() {
return GrpcServiceConfig.newBuilder()
.setServiceName(uniquify("serviceName"))
@@ -275,11 +319,11 @@ public class AlphaIntegrationTest {
}
private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
- return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+ return someGrpcEvent(type, globalTxId, localTxId);
}
- private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
- return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+ private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
}
private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.