You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/14 08:56:50 UTC

[incubator-servicecomb-saga] 01/02: SCB-220 compensated only ended events

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 967b0a5b215a0c9147db109f420a2605595b637c
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 13 09:47:39 2018 +0800

    SCB-220 compensated only ended events
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 46 +++++++++-------
 .../servicecomb/saga/alpha/core/TxEvent.java       | 12 +++++
 .../saga/alpha/core/TxEventRepository.java         |  4 ++
 .../saga/alpha/core/TxConsistentServiceTest.java   | 39 ++++++++++++--
 .../saga/alpha/server/SpringTxEventRepository.java | 16 ++++--
 .../alpha/server/TxEventEnvelopeRepository.java    | 24 +++++++--
 .../saga/alpha/server/AlphaIntegrationTest.java    | 62 ++++++++++++++++++----
 7 files changed, 164 insertions(+), 39 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 55de6b7..9a3a314 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.Collections.emptySet;
 import static org.apache.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
@@ -25,11 +26,10 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -42,11 +42,12 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
     put(TxAbortedEvent.name(), (event) -> compensate(event));
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
-  private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+  private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -57,25 +58,36 @@ public class TxConsistentService {
   public void handle(TxEvent event) {
     eventRepository.save(event);
 
-    executor.execute(() -> {
-      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
-        omegaCallback.compensate(event);
-      }
+    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+  }
+
+  private void compensateIfAlreadyAborted(TxEvent event) {
+    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
+      eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+      TxEvent correspondingStartedEvent = eventRepository
+          .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
+
+      omegaCallback.compensate(correspondingStartedEvent);
+    }
+  }
 
-      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
-    });
+  private boolean isCompensationScheduled(TxEvent event) {
+    return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
-    eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
-      Set<String> eventSet = new ConcurrentSkipListSet<>();
-      events.forEach(e -> eventSet.add(e.localTxId()));
-      return eventSet;
-    });
+    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
+
+    events.removeIf(this::isCompensationScheduled);
+
+    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
+    events.forEach(e -> localTxIds.add(e.localTxId()));
+
     events.forEach(omegaCallback::compensate);
   }
 
+  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
+  // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
     Set<String> events = eventsToCompensate.get(event.globalTxId());
     if (events != null) {
@@ -96,8 +108,4 @@ public class TxConsistentService {
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
-
-  private boolean isTxEndedEvent(TxEvent event) {
-    return TxEndedEvent.name().equals(event.type());
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 9a2cea4..ebea44c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -36,6 +36,18 @@ public class TxEvent {
   public TxEvent(
       String serviceName,
       String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  public TxEvent(
+      String serviceName,
+      String instanceId,
       Date creationTime,
       String globalTxId,
       String localTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a8387b..cf5706b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,4 +23,8 @@ public interface TxEventRepository {
   void save(TxEvent event);
 
   List<TxEvent> findTransactions(String globalTxId, String type);
+
+  TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+
+  List<TxEvent> findTransactionsToCompensate(String globalTxId);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index febbfaf..c7e5391 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -55,6 +55,40 @@ public class TxConsistentServiceTest {
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
     }
+
+    @Override
+    public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
+          .findFirst()
+          .get();
+    }
+
+    @Override
+    public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId())
+              && event.type().equals(TxStartedEvent.name())
+              && isCompleted(globalTxId, event)
+              && !isCompensated(globalTxId, event))
+          .collect(Collectors.toList());
+    }
+
+    private boolean isCompleted(String globalTxId, TxEvent event) {
+      return events.stream()
+          .filter(e -> globalTxId.equals(e.globalTxId())
+              && e.localTxId().equals(event.localTxId())
+              && e.type().equals(TxEndedEvent.name()))
+          .count() > 0;
+    }
+
+    private boolean isCompensated(String globalTxId, TxEvent event) {
+      return events.stream()
+          .filter(e -> globalTxId.equals(e.globalTxId())
+              && e.localTxId().equals(event.localTxId())
+              && e.type().equals(TxCompensatedEvent.name()))
+          .count() > 0;
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
@@ -120,17 +154,16 @@ public class TxConsistentServiceTest {
 
   @Test
   public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
-    String localTxId1 = UUID.randomUUID().toString();
     events.add(newEvent(TxStartedEvent));
     events.add(newEvent(TxAbortedEvent));
 
-    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+    TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
 
     consistentService.handle(event);
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
     assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+        new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
     ));
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 3bf6e03..00d10d2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
@@ -37,9 +36,16 @@ class SpringTxEventRepository implements TxEventRepository {
 
   @Override
   public List<TxEvent> findTransactions(String globalTxId, String type) {
-    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
-        .stream()
-        .map(TxEventEnvelope::event)
-        .collect(Collectors.toList());
+    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
+  }
+
+  @Override
+  public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+    return eventRepo.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(globalTxId, localTxId, type).event();
+  }
+
+  @Override
+  public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+    return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fe05c1e..1e1859c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,15 +19,33 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
 
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
-  TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+  List<TxEventEnvelope> findByEventGlobalTxId(String globalTxId);
 
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.server.TxEventEnvelope("
+  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
-  List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+  List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+
+  TxEventEnvelope findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(String globalTxId, String localTxId, String type);
+
+  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+      + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+      + ") FROM TxEventEnvelope t "
+      + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+      + "  FROM TxEventEnvelope t1 "
+      + "  WHERE t1.event.globalTxId = ?1 "
+      + "  AND t1.event.localTxId = t.event.localTxId "
+      + "  AND t1.event.type = 'TxEndedEvent'"
+      + ") AND NOT EXISTS ( "
+      + "  FROM TxEventEnvelope t2 "
+      + "  WHERE t2.event.globalTxId = ?1 "
+      + "  AND t2.event.localTxId = t.event.localTxId "
+      + "  AND t2.event.type = 'TxCompensatedEvent')")
+  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 9f13a8f..cbbe8ee 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
@@ -118,11 +119,11 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
-    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
 
     assertThat(receivedCommands.isEmpty(), is(true));
 
-    TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
+    TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId).get(0);
 
     assertThat(envelope.serviceName(), is(serviceName));
     assertThat(envelope.instanceId(), is(instanceId));
@@ -178,6 +179,7 @@ public class AlphaIntegrationTest {
   public void removeCallbackOnClientDown() throws Exception {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     omegaCallbacks.get(serviceName).get(instanceId).disconnect();
 
@@ -187,17 +189,34 @@ public class AlphaIntegrationTest {
   }
 
   @Test
+  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+  }
+
+  @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method a"));
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
-    blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method b"));
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
@@ -214,7 +233,8 @@ public class AlphaIntegrationTest {
   public void getCompensateCommandOnFailure() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
-    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+    await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -231,6 +251,7 @@ public class AlphaIntegrationTest {
   public void compensateOnlyFailedGlobalTransaction() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
@@ -239,7 +260,7 @@ public class AlphaIntegrationTest {
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -250,6 +271,29 @@ public class AlphaIntegrationTest {
     anotherBlockingStub.onDisconnected(anotherServiceConfig);
   }
 
+  @Test
+  public void compensateOnlyCompletedTransactions() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    String anotherLocalTxId1 = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId1));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId1));
+    blockingStub.onTxEvent(someGrpcEvent(TxCompensatedEvent, globalTxId, anotherLocalTxId1));
+
+    String anotherLocalTxId2 = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    assertThat(receivedCommands.size(), is(1));
+    assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
@@ -275,11 +319,11 @@ public class AlphaIntegrationTest {
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+    return someGrpcEvent(type, globalTxId, localTxId);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
   private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.