You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:18 UTC

[incubator-servicecomb-saga] 06/12: SCB-218 polling events into commands in the background

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit fff84355f893699b3e6903ac6ba362cf7960cf41
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 10:28:22 2018 +0800

    SCB-218 polling events into commands in the background
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/CommandRepository.java         |   5 +-
 ...{TxConsistentService.java => EventScanner.java} | 139 +++++++-------
 .../saga/alpha/core/TxConsistentService.java       | 102 +---------
 .../saga/alpha/core/TxEventRepository.java         |   5 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 208 ++-------------------
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  12 +-
 .../saga/alpha/server/SpringCommandRepository.java |  25 +--
 .../saga/alpha/server/SpringTxEventRepository.java |  10 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  31 ++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  12 +-
 10 files changed, 126 insertions(+), 423 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 22f2b41..1da033f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -20,11 +20,8 @@ package org.apache.servicecomb.saga.alpha.core;
 import java.util.List;
 
 public interface CommandRepository {
-  boolean exists(String globalTxId, String localTxId);
 
-  void saveCompensationCommand(String globalTxId, String localTxId);
-
-  void saveCompensationCommands(String globalTxId);
+  Iterable<Command> saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
similarity index 50%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 61beea5..ea4aaf6 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,111 +19,116 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TxConsistentService {
+public class EventScanner implements Runnable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
+  private final ScheduledExecutorService scheduler;
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
-  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
-    put(TxAbortedEvent.name(), (event) -> compensate(event));
-    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
-  }};
-
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler;
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval) {
+  private final int commandPollingInterval;
+  private final int eventPollingInterval;
 
-    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
-  }
+  private long nextEndedEventId;
+  private long nextCompensatedEventId;
 
-  public TxConsistentService(TxEventRepository eventRepository,
+  public EventScanner(ScheduledExecutorService scheduler,
+      TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       int commandPollingInterval,
-      ScheduledExecutorService scheduler) {
+      int eventPollingInterval) {
+
+    this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
-    this.scheduler = scheduler;
-
-    scheduleCompensationCommandPolling(commandPollingInterval);
+    this.commandPollingInterval = commandPollingInterval;
+    this.eventPollingInterval = eventPollingInterval;
   }
 
-  public boolean handle(TxEvent event) {
-    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
-      return false;
-    }
-
-    eventRepository.save(event);
-
-    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
-    return true;
+  @Override
+  public void run() {
+    pollCompensationCommand(commandPollingInterval);
+    pollEvents();
   }
 
-  private void compensateIfAlreadyAborted(TxEvent event) {
-    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-    }
+  private void pollEvents() {
+    scheduler.scheduleWithFixedDelay(
+        () -> {
+          saveUncompensatedEventsToCommands();
+          updateCompensatedCommands();
+        },
+        0,
+        eventPollingInterval,
+        MILLISECONDS);
   }
 
-  private boolean isCompensationScheduled(TxEvent event) {
-    return commandRepository.exists(event.globalTxId(), event.localTxId());
+  private void saveUncompensatedEventsToCommands() {
+    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+        .forEach(event -> {
+          log.info("Found uncompensated event {}", event);
+          nextEndedEventId = event.id();
+          commandRepository.saveCompensationCommands(event.globalTxId())
+              .forEach(command -> nextEndedEventId = command.id());
+        });
   }
 
-  private void compensate(TxEvent event) {
-    commandRepository.saveCompensationCommands(event.globalTxId());
+  private void updateCompensatedCommands() {
+    eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId, TxCompensatedEvent.name())
+        .ifPresent(event -> {
+          log.info("Found compensated event {}", event);
+          nextCompensatedEventId = event.id();
+          updateCompensationStatus(event);
+        });
   }
 
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
-  private void updateCompensateStatus(TxEvent event) {
+  private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+    log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+        event.globalTxId(),
+        event.localTxId());
 
     if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
         && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+
       markGlobalTxEnd(event);
-      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
     }
   }
 
   private void markGlobalTxEnd(TxEvent event) {
-    eventRepository.save(new TxEvent(
-        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
-        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
+    eventRepository.save(toSagaEndedEvent(event));
+    log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
   }
 
-  private boolean isGlobalTxAborted(TxEvent event) {
-    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  private TxEvent toSagaEndedEvent(TxEvent event) {
+    return new TxEvent(
+        event.serviceName(),
+        event.instanceId(),
+        new Date(),
+        event.globalTxId(),
+        event.globalTxId(),
+        null,
+        SagaEndedEvent.name(),
+        "",
+        EMPTY_PAYLOAD);
   }
 
-  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+  private void pollCompensationCommand(int commandPollingInterval) {
     scheduler.scheduleWithFixedDelay(
         () -> commandRepository.findFirstCommandToCompensate()
             .forEach(command -> {
@@ -131,19 +136,23 @@ public class TxConsistentService {
                   command.globalTxId(),
                   command.localTxId());
 
-              omegaCallback.compensate(new TxEvent(
-                  command.serviceName(),
-                  command.instanceId(),
-                  command.globalTxId(),
-                  command.localTxId(),
-                  command.parentTxId(),
-                  TxStartedEvent.name(),
-                  command.compensationMethod(),
-                  command.payloads()
-              ));
+              omegaCallback.compensate(txStartedEventOf(command));
             }),
         0,
         commandPollingInterval,
         MILLISECONDS);
   }
+
+  private TxEvent txStartedEventOf(Command command) {
+    return new TxEvent(
+        command.serviceName(),
+        command.instanceId(),
+        command.globalTxId(),
+        command.localTxId(),
+        command.parentTxId(),
+        TxStartedEvent.name(),
+        command.compensationMethod(),
+        command.payloads()
+    );
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 61beea5..c55090a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,133 +17,35 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
-  private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final TxEventRepository eventRepository;
-  private final CommandRepository commandRepository;
-  private final OmegaCallback omegaCallback;
-  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
-    put(TxAbortedEvent.name(), (event) -> compensate(event));
-    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
-  }};
-
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler;
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval) {
 
-    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
-  }
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval,
-      ScheduledExecutorService scheduler) {
+  public TxConsistentService(TxEventRepository eventRepository) {
     this.eventRepository = eventRepository;
-    this.commandRepository = commandRepository;
-    this.omegaCallback = omegaCallback;
-    this.scheduler = scheduler;
-
-    scheduleCompensationCommandPolling(commandPollingInterval);
   }
 
   public boolean handle(TxEvent event) {
     if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+      log.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted", event.globalTxId());
       return false;
     }
 
     eventRepository.save(event);
 
-    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
     return true;
   }
 
-  private void compensateIfAlreadyAborted(TxEvent event) {
-    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-    }
-  }
-
-  private boolean isCompensationScheduled(TxEvent event) {
-    return commandRepository.exists(event.globalTxId(), event.localTxId());
-  }
-
-  private void compensate(TxEvent event) {
-    commandRepository.saveCompensationCommands(event.globalTxId());
-  }
-
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global TX in @Compensable
-  private void updateCompensateStatus(TxEvent event) {
-    commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
-
-    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
-        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
-      markGlobalTxEnd(event);
-      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
-    }
-  }
-
-  private void markGlobalTxEnd(TxEvent event) {
-    eventRepository.save(new TxEvent(
-        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
-        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
-  }
-
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
-
-  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
-    scheduler.scheduleWithFixedDelay(
-        () -> commandRepository.findFirstCommandToCompensate()
-            .forEach(command -> {
-              log.info("Compensating transaction with globalTxId {} and localTxId {}",
-                  command.globalTxId(),
-                  command.localTxId());
-
-              omegaCallback.compensate(new TxEvent(
-                  command.serviceName(),
-                  command.instanceId(),
-                  command.globalTxId(),
-                  command.localTxId(),
-                  command.parentTxId(),
-                  TxStartedEvent.name(),
-                  command.compensationMethod(),
-                  command.payloads()
-              ));
-            }),
-        0,
-        commandPollingInterval,
-        MILLISECONDS);
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index cf5706b..d793de2 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -18,13 +18,14 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface TxEventRepository {
   void save(TxEvent event);
 
   List<TxEvent> findTransactions(String globalTxId, String type);
 
-  TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
-  List<TxEvent> findTransactionsToCompensate(String globalTxId);
+  Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 212c621..473501e 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -18,31 +18,23 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static java.util.Collections.emptyList;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -64,93 +56,13 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
-      return events.stream()
-          .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
-          .findFirst()
-          .get();
-    }
-
-    @Override
-    public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
-      return events.stream()
-          .filter(event -> globalTxId.equals(event.globalTxId())
-              && event.type().equals(TxStartedEvent.name())
-              && isCompleted(globalTxId, event)
-              && !isCompensated(globalTxId, event))
-          .collect(Collectors.toList());
-    }
-
-    private boolean isCompleted(String globalTxId, TxEvent event) {
-      return events.stream()
-          .filter(e -> globalTxId.equals(e.globalTxId())
-              && e.localTxId().equals(event.localTxId())
-              && e.type().equals(TxEndedEvent.name()))
-          .count() > 0;
-    }
-
-    private boolean isCompensated(String globalTxId, TxEvent event) {
-      return events.stream()
-          .filter(e -> globalTxId.equals(e.globalTxId())
-              && e.localTxId().equals(event.localTxId())
-              && e.type().equals(TxCompensatedEvent.name()))
-          .count() > 0;
-    }
-  };
-
-  private final List<Command> commands = new ArrayList<>();
-  private final CommandRepository commandRepository = new CommandRepository() {
-    @Override
-    public boolean exists(String globalTxId, String localTxId) {
-      return commands.stream()
-          .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
-    }
-
-    @Override
-    public void saveCompensationCommand(String globalTxId, String localTxId) {
-      TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
-      commands.add(new Command(event));
+    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+      return emptyList();
     }
 
     @Override
-    public void saveCompensationCommands(String globalTxId) {
-      List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
-
-      Map<String, Command> commandMap = new HashMap<>();
-
-      for (TxEvent event : events) {
-        commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
-      }
-
-      commands.addAll(commandMap.values());
-    }
-
-    @Override
-    public void markCommandAsDone(String globalTxId, String localTxId) {
-      for (int i = 0; i < commands.size(); i++) {
-        Command command = commands.get(i);
-        if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
-          commands.set(i, new Command(command, DONE));
-        }
-      }
-    }
-
-    @Override
-    public List<Command> findUncompletedCommands(String globalTxId) {
-      return commands.stream()
-          .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
-          .collect(Collectors.toList());
-    }
-
-    @Override
-    public List<Command> findFirstCommandToCompensate() {
-      List<Command> results = new ArrayList<>(1);
-      commands.stream()
-          .filter(command -> !DONE.name().equals(command.status()))
-          .findFirst()
-          .ifPresent(results::add);
-
-      return results;
+    public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+      return Optional.empty();
     }
   };
 
@@ -161,20 +73,9 @@ public class TxConsistentServiceTest {
   private final String instanceId = uniquify("instanceId");
 
   private final String compensationMethod = getClass().getCanonicalName();
-  private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
-  private Consumer<TxEvent> eventConsumer = event -> {};
-  private final OmegaCallback omegaCallback = event -> {
-    eventConsumer.accept(event);
-    compensationContexts.add(
-        new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
-  };
-
-  private final TxConsistentService consistentService = new TxConsistentService(
-      eventRepository,
-      commandRepository,
-      omegaCallback,
-      300);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository);
+  private final byte[] payloads = "yeah".getBytes();
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -190,49 +91,6 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
-    assertThat(compensationContexts.isEmpty(), is(true));
-  }
-
-  @Test
-  public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
-    eventConsumer = event -> consistentService
-        .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
-
-    String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
-
-    String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
-
-    TxEvent abortEvent = newEvent(TxAbortedEvent);
-
-    consistentService.handle(abortEvent);
-
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
-    ));
-
-    await().atMost(1, SECONDS).until(() -> events.size() == 8);
-    assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
-  }
-
-  @Test
-  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
-    events.add(newEvent(TxStartedEvent));
-    events.add(newEvent(TxAbortedEvent));
-
-    TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
-
-    consistentService.handle(event);
-
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
-    ));
   }
 
   @Test
@@ -241,7 +99,7 @@ public class TxConsistentServiceTest {
     events.add(newEvent(TxStartedEvent));
     events.add(newEvent(TxAbortedEvent));
 
-    TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+    TxEvent event = eventOf(TxStartedEvent, localTxId1);
 
     consistentService.handle(event);
 
@@ -249,10 +107,10 @@ public class TxConsistentServiceTest {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
   }
 
-  private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId, String compensationMethod) {
+  private TxEvent eventOf(EventType eventType, String localTxId) {
     return new TxEvent(serviceName, instanceId, new Date(),
         globalTxId,
         localTxId,
@@ -261,48 +119,4 @@ public class TxConsistentServiceTest {
         compensationMethod,
         payloads);
   }
-
-  private static class CompensationContext {
-    private final String globalTxId;
-    private final String localTxId;
-    private final String compensationMethod;
-    private final byte[] message;
-
-    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
-      this.globalTxId = globalTxId;
-      this.localTxId = localTxId;
-      this.compensationMethod = compensationMethod;
-      this.message = message;
-    }
-
-    @Override
-    public String toString() {
-      return "CompensationContext{" +
-          "globalTxId='" + globalTxId + '\'' +
-          ", localTxId='" + localTxId + '\'' +
-          ", compensationMethod='" + compensationMethod + '\'' +
-          ", message=" + Arrays.toString(message) +
-          '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      CompensationContext that = (CompensationContext) o;
-      return Objects.equals(globalTxId, that.globalTxId) &&
-          Objects.equals(localTxId, that.localTxId) &&
-          Objects.equals(compensationMethod, that.compensationMethod) &&
-          Arrays.equals(message, that.message);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
-    }
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 3740581..73298be 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -28,6 +28,7 @@ import javax.annotation.PostConstruct;
 
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
@@ -68,24 +69,27 @@ class AlphaConfig {
 
   @Bean
   ScheduledExecutorService compensationScheduler() {
-    return Executors.newSingleThreadScheduledExecutor();
+    return Executors.newScheduledThreadPool(2);
   }
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
-      @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+      @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
+      @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(
+    new EventScanner(scheduler,
         eventRepository,
         commandRepository,
         omegaCallback,
         commandPollingInterval,
-        scheduler);
+        eventPollingInterval).run();
+
+    TxConsistentService consistentService = new TxConsistentService(eventRepository);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 34b43a4..6076e54 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
+import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +29,12 @@ import java.util.Map;
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
 
   private final TxEventEnvelopeRepository eventRepository;
@@ -43,22 +46,7 @@ public class SpringCommandRepository implements CommandRepository {
   }
 
   @Override
-  public boolean exists(String globalTxId, String localTxId) {
-    return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent();
-  }
-
-  @Override
-  public void saveCompensationCommand(String globalTxId, String localTxId) {
-    TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(
-        globalTxId,
-        localTxId,
-        TxStartedEvent.name());
-
-    commandRepository.save(new Command(startedEvent));
-  }
-
-  @Override
-  public void saveCompensationCommands(String globalTxId) {
+  public Iterable<Command> saveCompensationCommands(String globalTxId) {
     List<TxEvent> events = eventRepository
         .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
@@ -68,7 +56,8 @@ public class SpringCommandRepository implements CommandRepository {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
     }
 
-    commandRepository.save(commands.values());
+    log.info("Saving compensation commands {}", commands.values());
+    return commandRepository.save(commands.values());
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7c44639..4108aa5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,9 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.springframework.data.domain.PageRequest;
 
 class SpringTxEventRepository implements TxEventRepository {
   private final TxEventEnvelopeRepository eventRepo;
@@ -40,12 +42,12 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
-    return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type);
+  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1));
   }
 
   @Override
-  public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
-    return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
+  public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 09aec6f..78c2a1d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -18,8 +18,10 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -32,13 +34,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
-  TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type);
-
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
-      + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent'"
-      + "AND EXISTS ("
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = ?1 "
@@ -50,19 +47,15 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
-      + "ORDER BY t.surrogateId ASC"
- )
-  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+      + "ORDER BY t.surrogateId ASC")
+  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
-  @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
-      + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
-      + "  WHERE t1.globalTxId = ?1 "
-      + "  AND t1.localTxId = t.localTxId "
-      + "  AND t1.type = 'TxEndedEvent'"
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "  AND t1.type = 'TxAbortedEvent'"
       + ") AND NOT EXISTS ( "
       + "  SELECT t2.globalTxId"
       + "  FROM TxEvent t2 "
@@ -70,5 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
+
+  Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1bfbe3c..5cff57c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,8 +34,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -57,8 +55,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -69,7 +65,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -417,10 +413,4 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
-
-  @Primary
-  @Bean
-  ScheduledExecutorService scheduler() {
-    return Executors.newScheduledThreadPool(2);
-  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.