You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:28 UTC

[incubator-servicecomb-saga] 02/09: SCB-218 polling in background for events to compensate

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 994e1c3bac9c432433a80dfd2f9cc77bf528b742
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 18:43:31 2018 +0800

    SCB-218 polling in background for events to compensate
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-core/pom.xml                           | 15 +++++++
 .../servicecomb/saga/alpha/core/Command.java       | 20 +++++++++
 .../saga/alpha/core/CommandRepository.java         |  2 +
 .../saga/alpha/core/TxConsistentService.java       | 51 +++++++++++++++++-----
 .../saga/alpha/core/TxConsistentServiceTest.java   | 35 +++++++++++----
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  7 ++-
 .../saga/alpha/server/CommandEntity.java           |  6 ++-
 .../saga/alpha/server/CommandEntityRepository.java | 14 +++++-
 .../saga/alpha/server/SpringCommandRepository.java | 20 ++++++++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  3 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 39 +++++++++++++----
 11 files changed, 174 insertions(+), 38 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index aa74718..107fff4 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -44,6 +44,21 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 08f8527..c852c16 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -83,6 +83,14 @@ public class Command {
         event.payloads());
   }
 
+  String serviceName() {
+    return serviceName;
+  }
+
+  String instanceId() {
+    return instanceId;
+  }
+
   String globalTxId() {
     return globalTxId;
   }
@@ -91,6 +99,18 @@ public class Command {
     return localTxId;
   }
 
+  String parentTxId() {
+    return parentTxId;
+  }
+
+  String compensationMethod() {
+    return compensationMethod;
+  }
+
+  byte[] payloads() {
+    return payloads;
+  }
+
   String status() {
     return status;
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 915d476..22f2b41 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -29,4 +29,6 @@ public interface CommandRepository {
   void markCommandAsDone(String globalTxId, String localTxId);
 
   List<Command> findUncompletedCommands(String globalTxId);
+
+  List<Command> findFirstCommandToCompensate();
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 560096f..8f88735 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,21 +17,27 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class TxConsistentService {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
 
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -46,13 +52,17 @@ public class TxConsistentService {
   }};
 
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public TxConsistentService(TxEventRepository eventRepository,
       CommandRepository commandRepository,
-      OmegaCallback omegaCallback) {
+      OmegaCallback omegaCallback,
+      int commandPollingInterval) {
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
+
+    scheduleCompensationCommandPolling(commandPollingInterval);
   }
 
   public boolean handle(TxEvent event) {
@@ -69,10 +79,6 @@ public class TxConsistentService {
   private void compensateIfAlreadyAborted(TxEvent event) {
     if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
       commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-      TxEvent correspondingStartedEvent = eventRepository
-          .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
-
-      omegaCallback.compensate(correspondingStartedEvent);
     }
   }
 
@@ -81,22 +87,19 @@ public class TxConsistentService {
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
-
-    events.removeIf(this::isCompensationScheduled);
-
     commandRepository.saveCompensationCommands(event.globalTxId());
-
-    events.forEach(omegaCallback::compensate);
   }
 
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+
     if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
         && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
+      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
     }
   }
 
@@ -109,4 +112,28 @@ public class TxConsistentService {
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
+
+  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+    scheduler.scheduleWithFixedDelay(
+        () -> commandRepository.findFirstCommandToCompensate()
+            .forEach(command -> {
+              log.info("Compensating transaction with globalTxId {} and localTxId {}",
+                  command.globalTxId(),
+                  command.localTxId());
+
+              omegaCallback.compensate(new TxEvent(
+                  command.serviceName(),
+                  command.instanceId(),
+                  command.globalTxId(),
+                  command.localTxId(),
+                  command.parentTxId(),
+                  TxStartedEvent.name(),
+                  command.compensationMethod(),
+                  command.payloads()
+              ));
+            }),
+        0,
+        commandPollingInterval,
+        MILLISECONDS);
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 8ae60a3..212c621 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -140,6 +141,17 @@ public class TxConsistentServiceTest {
           .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
           .collect(Collectors.toList());
     }
+
+    @Override
+    public List<Command> findFirstCommandToCompensate() {
+      List<Command> results = new ArrayList<>(1);
+      commands.stream()
+          .filter(command -> !DONE.name().equals(command.status()))
+          .findFirst()
+          .ifPresent(results::add);
+
+      return results;
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
@@ -151,10 +163,18 @@ public class TxConsistentServiceTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
-  private final OmegaCallback omegaCallback = event ->
-      compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+  private Consumer<TxEvent> eventConsumer = event -> {};
+  private final OmegaCallback omegaCallback = event -> {
+    eventConsumer.accept(event);
+    compensationContexts.add(
+        new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+  };
 
-  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+  private final TxConsistentService consistentService = new TxConsistentService(
+      eventRepository,
+      commandRepository,
+      omegaCallback,
+      300);
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -175,6 +195,9 @@ public class TxConsistentServiceTest {
 
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+    eventConsumer = event -> consistentService
+        .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
+
     String localTxId1 = UUID.randomUUID().toString();
     events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
     events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
@@ -193,12 +216,6 @@ public class TxConsistentServiceTest {
         new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
     ));
 
-    TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
-    consistentService.handle(compensateEvent2);
-
-    TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
-    consistentService.handle(compensateEvent1);
-
     await().atMost(1, SECONDS).until(() -> events.size() == 8);
     assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 00dfe27..9c34738 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -66,12 +66,17 @@ class AlphaConfig {
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
+      @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+    TxConsistentService consistentService = new TxConsistentService(
+        eventRepository,
+        commandRepository,
+        omegaCallback,
+        commandPollingInterval);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
index 3eac681..6207002 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -21,7 +21,6 @@ import java.util.Date;
 
 import javax.persistence.Embedded;
 import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
 import javax.persistence.Id;
 import javax.persistence.Version;
 
@@ -31,7 +30,6 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent;
 @Entity
 class CommandEntity {
   @Id
-  @GeneratedValue
   private long surrogateId;
 
   @Embedded
@@ -50,4 +48,8 @@ class CommandEntity {
     lastModified = new Date();
     command = new Command(event);
   }
+
+  Command command() {
+    return command;
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b7309e..9402486 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.List;
 import java.util.Optional;
 
-import org.apache.servicecomb.saga.alpha.core.Command;
+import javax.transaction.Transactional;
+
+import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
@@ -29,6 +31,7 @@ import org.springframework.data.repository.query.Param;
 public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
   Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
 
+  @Transactional
   @Modifying
   @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
       + "SET c.command.status = :status "
@@ -39,5 +42,12 @@ public interface CommandEntityRepository extends CrudRepository<CommandEntity, L
       @Param("globalTxId") String globalTxId,
       @Param("localTxId") String localTxId);
 
-  List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+  List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+
+  @Query("FROM CommandEntity c "
+      + "WHERE id IN ("
+      + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId"
+      + ") "
+      + "ORDER BY c.id ASC")
+  List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 9281b7e..aac3c22 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -24,11 +24,15 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
+  private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
+
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
 
@@ -44,7 +48,7 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public void saveCompensationCommand(String globalTxId, String localTxId) {
-    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc(
         globalTxId,
         localTxId,
         TxStartedEvent.name());
@@ -73,6 +77,18 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public List<Command> findUncompletedCommands(String globalTxId) {
-    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name())
+        .stream()
+        .map(CommandEntity::command)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<Command> findFirstCommandToCompensate() {
+    return commandRepository
+        .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST)
+        .stream()
+        .map(CommandEntity::command)
+        .collect(Collectors.toList());
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fcb7c00..85ed954 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -63,6 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEventEnvelope t2 "
       + "  WHERE t2.event.globalTxId = ?1 "
       + "  AND t2.event.localTxId = t.event.localTxId "
-      + "  AND t2.event.type = 'TxCompensatedEvent')")
+      + "  AND t2.event.type = 'TxCompensatedEvent')"
+      + "ORDER BY t.id ASC ")
   List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 6d4f91f..1514869 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -24,7 +24,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -34,11 +34,12 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
 
-import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -63,7 +64,8 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -97,7 +99,7 @@ public class AlphaIntegrationTest {
   private TxConsistentService consistentService;
 
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
-  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
+  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -222,11 +224,11 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
 
-    assertThat(receivedCommands, containsInAnyOrder(
-        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+    assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
+            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
     ));
   }
 
@@ -305,7 +307,7 @@ public class AlphaIntegrationTest {
     String anotherLocalTxId2 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -314,6 +316,15 @@ public class AlphaIntegrationTest {
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
   }
 
+  private GrpcAck onCompensation(GrpcCompensateCommand command) {
+    return blockingStub.onTxEvent(
+        eventOf(TxCompensatedEvent,
+            command.getLocalTxId(),
+            command.getParentTxId(),
+            new byte[0],
+            command.getCompensateMethod()));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
@@ -371,11 +382,21 @@ public class AlphaIntegrationTest {
   }
 
   private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
+    private CompensateStreamObserver() {
+      this(command -> {});
+    }
+
+    private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+      this.consumer = consumer;
+    }
+
     @Override
     public void onNext(GrpcCompensateCommand command) {
       // intercept received command
+      consumer.accept(command);
       receivedCommands.add(command);
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.