You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:12 UTC

[incubator-servicecomb-saga] branch master updated (f877fc7 -> bd1277b)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from f877fc7  SCB-249 avoided hibernate and replaced JSON with TEXT due to limited postgres & jpa support
     new 4b89639  SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless
     new 6f27eac  SCB-218 polling in background for events to compensate
     new 9a40171  SCB-218 updated schemas accordingly due to change of ORM tech
     new ebf8851  SCB-218 excluded duplicate events during command persistence
     new 7aafab8  SCB-218 updated command status to pending when compensating
     new fff8435  SCB-218 polling events into commands in the background
     new da2ef46  SCB-218 added event id to check for command uniqueness
     new f075bd5  SCB-218 locked commands to avoid duplicate compensation callback
     new ffaa350  SCB-218 made sure saga ended event is always the last event
     new 28d8afa  SCB-218 removed missing column
     new 311efc7  SCB-218 address out of order compensation later
     new bd1277b  SCB-218 included alpha core for coverage report

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-core/pom.xml                           |  15 ++
 .../saga/alpha/core/{TxEvent.java => Command.java} | 100 ++++++++-----
 ...EventRepository.java => CommandRepository.java} |  11 +-
 .../{OmegaCallback.java => CommandStatus.java}     |   9 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 160 +++++++++++++++++++++
 .../saga/alpha/core/TxConsistentService.java       |  79 +---------
 .../servicecomb/saga/alpha/core/TxEvent.java       |  47 ++++++
 .../saga/alpha/core/TxEventRepository.java         |   7 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 147 +++----------------
 alpha/alpha-server/pom.xml                         |   4 +-
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  25 +++-
 .../saga/alpha/server/CommandEntityRepository.java |  56 ++++++++
 .../saga/alpha/server/SpringCommandRepository.java |  96 +++++++++++++
 .../saga/alpha/server/SpringTxEventRepository.java |  15 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  45 ++++--
 .../src/main/resources/schema-postgresql.sql       |  20 ++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  96 ++++++++++++-
 alpha/alpha-server/src/test/resources/schema.sql   |  41 ++++--
 integration-tests/coverage-aggregate/pom.xml       |   4 +
 .../saga/omega/transaction/TransactionAspect.java  |   1 +
 20 files changed, 693 insertions(+), 285 deletions(-)
 copy alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/{TxEvent.java => Command.java} (60%)
 copy alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/{TxEventRepository.java => CommandRepository.java} (76%)
 copy alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/{OmegaCallback.java => CommandStatus.java} (89%)
 create mode 100644 alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 04/12: SCB-218 excluded duplicate events during command persistence

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ebf8851ea7f430f80c60839ff277b4764abb223a
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 16:59:51 2018 +0800

    SCB-218 excluded duplicate events during command persistence
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       | 32 ++++++++++++++++++----
 .../saga/alpha/core/TxConsistentService.java       | 12 +++++++-
 .../servicecomb/saga/alpha/core/TxEvent.java       | 32 ++++++++++++++++++++--
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 11 +++++++-
 .../saga/alpha/server/SpringCommandRepository.java |  4 +--
 .../alpha/server/TxEventEnvelopeRepository.java    |  4 +--
 alpha/alpha-server/src/test/resources/schema.sql   |  2 +-
 7 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b29988b..b5902e1 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -48,7 +48,8 @@ public class Command {
   Command() {
   }
 
-  Command(String serviceName,
+  Command(long id,
+      String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
@@ -57,6 +58,7 @@ public class Command {
       byte[] payloads,
       String status) {
 
+    this.surrogateId = id;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.globalTxId = globalTxId;
@@ -68,7 +70,8 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  Command(String serviceName,
+  Command(long id,
+      String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
@@ -76,11 +79,12 @@ public class Command {
       String compensationMethod,
       byte[] payloads) {
 
-    this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
+    this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
   Command(Command command, CommandStatus status) {
-    this(command.serviceName,
+    this(command.surrogateId,
+        command.serviceName,
         command.instanceId,
         command.globalTxId,
         command.localTxId,
@@ -91,7 +95,8 @@ public class Command {
   }
 
   public Command(TxEvent event) {
-    this(event.serviceName(),
+    this(event.id(),
+        event.serviceName(),
         event.instanceId(),
         event.globalTxId(),
         event.localTxId(),
@@ -131,4 +136,21 @@ public class Command {
   String status() {
     return status;
   }
+
+  long id() {
+    return surrogateId;
+  }
+
+  @Override
+  public String toString() {
+    return "Command{" +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        '}';
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 8f88735..61beea5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -52,15 +52,25 @@ public class TxConsistentService {
   }};
 
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+  private final ScheduledExecutorService scheduler;
 
   public TxConsistentService(TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       int commandPollingInterval) {
+
+    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
+  }
+
+  public TxConsistentService(TxEventRepository eventRepository,
+      CommandRepository commandRepository,
+      OmegaCallback omegaCallback,
+      int commandPollingInterval,
+      ScheduledExecutorService scheduler) {
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
+    this.scheduler = scheduler;
 
     scheduleCompensationCommandPolling(commandPollingInterval);
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 0ff2299..b654689 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -65,7 +65,34 @@ public class TxEvent {
       String type,
       String compensationMethod,
       byte[] payloads) {
-    this.surrogateId = -1L;
+    this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  public TxEvent(
+      long id,
+      String serviceName,
+      String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  TxEvent(Long surrogateId,
+      String serviceName,
+      String instanceId,
+      Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+
+    this.surrogateId = surrogateId;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.creationTime = creationTime;
@@ -120,7 +147,8 @@ public class TxEvent {
   @Override
   public String toString() {
     return "TxEvent{" +
-        "serviceName='" + serviceName + '\'' +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
         ", instanceId='" + instanceId + '\'' +
         ", creationTime=" + creationTime +
         ", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 9c34738..3740581 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.PostConstruct;
 
@@ -65,8 +67,14 @@ class AlphaConfig {
   }
 
   @Bean
+  ScheduledExecutorService compensationScheduler() {
+    return Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+      ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
@@ -76,7 +84,8 @@ class AlphaConfig {
         eventRepository,
         commandRepository,
         omegaCallback,
-        commandPollingInterval);
+        commandPollingInterval,
+        scheduler);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index a335849..18ee9ad 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -21,7 +21,7 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,7 +61,7 @@ public class SpringCommandRepository implements CommandRepository {
     List<TxEvent> events = eventRepository
         .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
-    Map<String, Command> commands = new HashMap<>();
+    Map<String, Command> commands = new LinkedHashMap<>();
 
     for (TxEvent event : events) {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 71d5f1b..09aec6f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -54,8 +54,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
  )
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+  @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+      + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
       + ") FROM TxEvent t "
       + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
       + "  SELECT t1.globalTxId"
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index c6d66de..fb396b3 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
 ) DEFAULT CHARSET=utf8;
 
 CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+  `surrogateId` bigint NOT NULL,
   `serviceName` varchar(36) NOT NULL,
   `instanceId` varchar(36) NOT NULL,
   `globalTxId` varchar(36) NOT NULL,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 08/12: SCB-218 locked commands to avoid duplicate compensation callback

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit f075bd5f27c76a073e253095a99a3b0b73153cfc
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 16:35:03 2018 +0800

    SCB-218 locked commands to avoid duplicate compensation callback
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       |  2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 28 +++++------
 .../saga/alpha/core/TxEventRepository.java         |  2 +
 .../saga/alpha/core/TxConsistentServiceTest.java   |  4 ++
 alpha/alpha-server/pom.xml                         |  4 +-
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  2 -
 .../saga/alpha/server/CommandEntityRepository.java |  3 ++
 .../saga/alpha/server/SpringCommandRepository.java | 17 ++++---
 .../saga/alpha/server/SpringTxEventRepository.java |  5 ++
 .../alpha/server/TxEventEnvelopeRepository.java    | 17 ++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 36 +++++++++++++-
 alpha/alpha-server/src/test/resources/schema.sql   | 55 +++++++++++-----------
 12 files changed, 119 insertions(+), 56 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 2716abf..49c1756 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -47,7 +47,7 @@ public class Command {
   private Date lastModified;
 
   @Version
-  private int version;
+  private long version;
 
   Command() {
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 5a4589d..f9fa3be 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -38,7 +38,6 @@ public class EventScanner implements Runnable {
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
-  private final int commandPollingInterval;
   private final int eventPollingInterval;
 
   private long nextEndedEventId;
@@ -48,20 +47,17 @@ public class EventScanner implements Runnable {
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
-      int commandPollingInterval,
       int eventPollingInterval) {
 
     this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
-    this.commandPollingInterval = commandPollingInterval;
     this.eventPollingInterval = eventPollingInterval;
   }
 
   @Override
   public void run() {
-    pollCompensationCommand(commandPollingInterval);
     pollEvents();
   }
 
@@ -69,7 +65,9 @@ public class EventScanner implements Runnable {
     scheduler.scheduleWithFixedDelay(
         () -> {
           saveUncompensatedEventsToCommands();
+          compensate();
           updateCompensatedCommands();
+          deleteDuplicateSagaEndedEvents();
         },
         0,
         eventPollingInterval,
@@ -94,6 +92,14 @@ public class EventScanner implements Runnable {
         });
   }
 
+  private void deleteDuplicateSagaEndedEvents() {
+    try {
+      eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+    } catch (Exception e) {
+      log.warn("Failed to delete duplicate SagaEndedEvent", e);
+    }
+  }
+
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensationStatus(TxEvent event) {
@@ -102,9 +108,11 @@ public class EventScanner implements Runnable {
         event.globalTxId(),
         event.localTxId());
 
-    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
-        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+    markSagaEnded(event);
+  }
 
+  private void markSagaEnded(TxEvent event) {
+    if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
     }
   }
@@ -127,14 +135,6 @@ public class EventScanner implements Runnable {
         EMPTY_PAYLOAD);
   }
 
-  private void pollCompensationCommand(int commandPollingInterval) {
-    scheduler.scheduleWithFixedDelay(
-        this::compensate,
-        0,
-        commandPollingInterval,
-        MILLISECONDS);
-  }
-
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index d793de2..b61aa06 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -28,4 +28,6 @@ public interface TxEventRepository {
   List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
   Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
+
+  void deleteDuplicateEvents(String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 473501e..231d5bf 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -64,6 +64,10 @@ public class TxConsistentServiceTest {
     public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
       return Optional.empty();
     }
+
+    @Override
+    public void deleteDuplicateEvents(String type) {
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index ae894b8..a05177a 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -82,8 +82,8 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.h2database</groupId>
-      <artifactId>h2</artifactId>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 73298be..769ee5a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -74,7 +74,6 @@ class AlphaConfig {
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
-      @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
       @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
@@ -86,7 +85,6 @@ class AlphaConfig {
         eventRepository,
         commandRepository,
         omegaCallback,
-        commandPollingInterval,
         eventPollingInterval).run();
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b8c3ee..17df477 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -19,10 +19,12 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
 
+import javax.persistence.LockModeType;
 import javax.transaction.Transactional;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Lock;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
@@ -44,6 +46,7 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
   List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
 
   // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
+  @Lock(LockModeType.OPTIMISTIC)
   @Query("SELECT c FROM Command c "
       + "WHERE c.eventId IN ("
       + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 1dabeda..8241d81 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.Transactional;
+
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -56,13 +58,15 @@ public class SpringCommandRepository implements CommandRepository {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
     }
 
-    log.info("Saving compensation commands {}", commands.values());
-    try {
-      commandRepository.save(commands.values());
-    } catch (Exception e) {
-      log.warn("Failed to save some commands", e);
+    for (Command command : commands.values()) {
+      log.info("Saving compensation command {}", command);
+      try {
+        commandRepository.save(command);
+      } catch (Exception e) {
+        log.warn("Failed to save some command {}", command);
+      }
+      log.info("Saved compensation command {}", command);
     }
-    log.info("Saved compensation commands {}", commands.values());
   }
 
   @Override
@@ -75,6 +79,7 @@ public class SpringCommandRepository implements CommandRepository {
     return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name());
   }
 
+  @Transactional
   @Override
   public List<Command> findFirstCommandToCompensate() {
     List<Command> commands = commandRepository
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 4108aa5..ad32148 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -50,4 +50,9 @@ class SpringTxEventRepository implements TxEventRepository {
   public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
     return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
   }
+
+  @Override
+  public void deleteDuplicateEvents(String type) {
+    eventRepo.deleteByType(type);
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index e974527..2e52fef 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -20,8 +20,11 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.List;
 import java.util.Optional;
 
+import javax.transaction.Transactional;
+
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -51,7 +54,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
-      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
+      + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = t.globalTxId "
@@ -59,11 +62,21 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + ") AND NOT EXISTS ( "
       + "  SELECT t2.globalTxId"
       + "  FROM TxEvent t2 "
-      + "  WHERE t2.globalTxId = ?1 "
+      + "  WHERE t2.globalTxId = t.globalTxId "
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
   List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
 
   Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
+
+  @Transactional
+  @Modifying(clearAutomatically = true)
+  @Query("DELETE FROM TxEvent t "
+      + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
+      + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
+      + " WHERE t1.type = ?1"
+      + " GROUP BY t1.globalTxId"
+      + ")")
+  void deleteByType(String type);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a5356bb..a928443 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -36,9 +36,13 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Consumer;
 
+import javax.annotation.PostConstruct;
+
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
+    properties = {"alpha.server.port=8090", "alpha.event.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -93,6 +97,18 @@ public class AlphaIntegrationTest {
   private TxEventEnvelopeRepository eventRepo;
 
   @Autowired
+  private TxEventRepository eventRepository;
+
+  @Autowired
+  private CommandRepository commandRepository;
+
+  @Autowired
+  private CommandEntityRepository commandEntityRepository;
+
+  @Autowired
+  private OmegaCallback omegaCallback;
+
+  @Autowired
   private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
   @Autowired
@@ -115,6 +131,19 @@ public class AlphaIntegrationTest {
   @After
   public void after() throws Exception {
     blockingStub.onDisconnected(serviceConfig);
+    deleteAllTillSuccessful();
+  }
+
+  public void deleteAllTillSuccessful() {
+    boolean deleted = false;
+    do {
+      try {
+        eventRepo.deleteAll();
+        commandEntityRepository.deleteAll();
+        deleted = true;
+      } catch (Exception ignored) {
+      }
+    } while (!deleted);
   }
 
   @Test
@@ -413,4 +442,9 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
+
+  @PostConstruct
+  void init() {
+//    new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+  }
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 71444ec..344fdda 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -1,29 +1,28 @@
-CREATE TABLE IF NOT EXISTS `TxEvent` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
-  `serviceName` varchar(36) NOT NULL,
-  `instanceId` varchar(36) NOT NULL,
-  `creationTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
-  `globalTxId` varchar(36) NOT NULL,
-  `localTxId` varchar(36) NOT NULL,
-  `parentTxId` varchar(36) DEFAULT NULL,
-  `type` varchar(50) NOT NULL,
-  `compensationMethod` varchar(256) NOT NULL,
-  `payloads` varbinary(10240),
-  PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS TxEvent (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads varbinary(10240),
+--  version bigint NOT NULL
+);
 
-CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
-  `eventId` bigint NOT NULL UNIQUE,
-  `serviceName` varchar(36) NOT NULL,
-  `instanceId` varchar(36) NOT NULL,
-  `globalTxId` varchar(36) NOT NULL,
-  `localTxId` varchar(36) NOT NULL,
-  `parentTxId` varchar(36) DEFAULT NULL,
-  `compensationMethod` varchar(256) NOT NULL,
-  `payloads` varbinary(10240),
-  `status` varchar(12),
-  `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
-  `version` bigint NOT NULL,
-  PRIMARY KEY (`surrogateId`)
-) DEFAULT CHARSET=utf8;
+CREATE TABLE IF NOT EXISTS Command (
+  surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads varbinary(10240),
+  status varchar(12),
+  lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  version bigint NOT NULL
+);

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 09/12: SCB-218 made sure saga ended event is always the last event

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit ffaa350c987ec77c43209cf990d6f9e2bcab2560
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 17:01:45 2018 +0800

    SCB-218 made sure saga ended event is always the last event
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/EventScanner.java  |  3 +--
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 29 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index f9fa3be..80deeb3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -100,8 +100,6 @@ public class EventScanner implements Runnable {
     }
   }
 
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
     log.info("Transaction with globalTxId {} and localTxId {} was compensated",
@@ -135,6 +133,7 @@ public class EventScanner implements Runnable {
         EMPTY_PAYLOAD);
   }
 
+  // TODO: 2018/1/19 potentially compensation may be out of order if we don't wait till received compensated event for the previous one, since compensation is async
   private void compensate() {
     commandRepository.findFirstCommandToCompensate()
         .forEach(command -> {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 769ee5a..35352f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -69,7 +69,7 @@ class AlphaConfig {
 
   @Bean
   ScheduledExecutorService compensationScheduler() {
-    return Executors.newScheduledThreadPool(2);
+    return Executors.newScheduledThreadPool(1);
   }
 
   @Bean
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a928443..5dddc1d 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -30,15 +31,18 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 import javax.annotation.PostConstruct;
 
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -345,6 +349,23 @@ public class AlphaIntegrationTest {
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
   }
 
+  @Test
+  public void sagaEndedEventIsAlwaysInTheEnd() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    String anotherLocalTxId = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId));
+
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 8);
+    List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+  }
+
   private GrpcAck onCompensation(GrpcCompensateCommand command) {
     return blockingStub.onTxEvent(
         eventOf(TxCompensatedEvent,
@@ -445,6 +466,12 @@ public class AlphaIntegrationTest {
 
   @PostConstruct
   void init() {
-//    new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run();
+    // simulates concurrent db connections
+    new EventScanner(
+        Executors.newSingleThreadScheduledExecutor(),
+        eventRepository,
+        commandRepository,
+        omegaCallback,
+        1).run();
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/12: SCB-218 polling in background for events to compensate

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6f27eacff767501c0ef0c9215c4eb7d71191a249
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 18:43:31 2018 +0800

    SCB-218 polling in background for events to compensate
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-core/pom.xml                           | 15 +++++++
 .../servicecomb/saga/alpha/core/Command.java       | 20 +++++++++
 .../saga/alpha/core/CommandRepository.java         |  2 +
 .../saga/alpha/core/TxConsistentService.java       | 51 +++++++++++++++++-----
 .../saga/alpha/core/TxConsistentServiceTest.java   | 35 +++++++++++----
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  7 ++-
 .../saga/alpha/server/CommandEntity.java           |  6 ++-
 .../saga/alpha/server/CommandEntityRepository.java | 14 +++++-
 .../saga/alpha/server/SpringCommandRepository.java | 20 ++++++++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  3 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 39 +++++++++++++----
 11 files changed, 174 insertions(+), 38 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index aa74718..107fff4 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -44,6 +44,21 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 08f8527..c852c16 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -83,6 +83,14 @@ public class Command {
         event.payloads());
   }
 
+  String serviceName() {
+    return serviceName;
+  }
+
+  String instanceId() {
+    return instanceId;
+  }
+
   String globalTxId() {
     return globalTxId;
   }
@@ -91,6 +99,18 @@ public class Command {
     return localTxId;
   }
 
+  String parentTxId() {
+    return parentTxId;
+  }
+
+  String compensationMethod() {
+    return compensationMethod;
+  }
+
+  byte[] payloads() {
+    return payloads;
+  }
+
   String status() {
     return status;
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 915d476..22f2b41 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -29,4 +29,6 @@ public interface CommandRepository {
   void markCommandAsDone(String globalTxId, String localTxId);
 
   List<Command> findUncompletedCommands(String globalTxId);
+
+  List<Command> findFirstCommandToCompensate();
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 560096f..8f88735 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,21 +17,27 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class TxConsistentService {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
 
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -46,13 +52,17 @@ public class TxConsistentService {
   }};
 
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public TxConsistentService(TxEventRepository eventRepository,
       CommandRepository commandRepository,
-      OmegaCallback omegaCallback) {
+      OmegaCallback omegaCallback,
+      int commandPollingInterval) {
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
+
+    scheduleCompensationCommandPolling(commandPollingInterval);
   }
 
   public boolean handle(TxEvent event) {
@@ -69,10 +79,6 @@ public class TxConsistentService {
   private void compensateIfAlreadyAborted(TxEvent event) {
     if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
       commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-      TxEvent correspondingStartedEvent = eventRepository
-          .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
-
-      omegaCallback.compensate(correspondingStartedEvent);
     }
   }
 
@@ -81,22 +87,19 @@ public class TxConsistentService {
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
-
-    events.removeIf(this::isCompensationScheduled);
-
     commandRepository.saveCompensationCommands(event.globalTxId());
-
-    events.forEach(omegaCallback::compensate);
   }
 
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+
     if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
         && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
       markGlobalTxEnd(event);
+      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
     }
   }
 
@@ -109,4 +112,28 @@ public class TxConsistentService {
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
+
+  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+    scheduler.scheduleWithFixedDelay(
+        () -> commandRepository.findFirstCommandToCompensate()
+            .forEach(command -> {
+              log.info("Compensating transaction with globalTxId {} and localTxId {}",
+                  command.globalTxId(),
+                  command.localTxId());
+
+              omegaCallback.compensate(new TxEvent(
+                  command.serviceName(),
+                  command.instanceId(),
+                  command.globalTxId(),
+                  command.localTxId(),
+                  command.parentTxId(),
+                  TxStartedEvent.name(),
+                  command.compensationMethod(),
+                  command.payloads()
+              ));
+            }),
+        0,
+        commandPollingInterval,
+        MILLISECONDS);
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 8ae60a3..212c621 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -140,6 +141,17 @@ public class TxConsistentServiceTest {
           .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
           .collect(Collectors.toList());
     }
+
+    @Override
+    public List<Command> findFirstCommandToCompensate() {
+      List<Command> results = new ArrayList<>(1);
+      commands.stream()
+          .filter(command -> !DONE.name().equals(command.status()))
+          .findFirst()
+          .ifPresent(results::add);
+
+      return results;
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
@@ -151,10 +163,18 @@ public class TxConsistentServiceTest {
   private final String compensationMethod = getClass().getCanonicalName();
   private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
-  private final OmegaCallback omegaCallback = event ->
-      compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+  private Consumer<TxEvent> eventConsumer = event -> {};
+  private final OmegaCallback omegaCallback = event -> {
+    eventConsumer.accept(event);
+    compensationContexts.add(
+        new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+  };
 
-  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+  private final TxConsistentService consistentService = new TxConsistentService(
+      eventRepository,
+      commandRepository,
+      omegaCallback,
+      300);
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -175,6 +195,9 @@ public class TxConsistentServiceTest {
 
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+    eventConsumer = event -> consistentService
+        .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
+
     String localTxId1 = UUID.randomUUID().toString();
     events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
     events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
@@ -193,12 +216,6 @@ public class TxConsistentServiceTest {
         new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
     ));
 
-    TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
-    consistentService.handle(compensateEvent2);
-
-    TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
-    consistentService.handle(compensateEvent1);
-
     await().atMost(1, SECONDS).until(() -> events.size() == 8);
     assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 00dfe27..9c34738 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -66,12 +66,17 @@ class AlphaConfig {
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
+      @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+    TxConsistentService consistentService = new TxConsistentService(
+        eventRepository,
+        commandRepository,
+        omegaCallback,
+        commandPollingInterval);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
index 3eac681..6207002 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -21,7 +21,6 @@ import java.util.Date;
 
 import javax.persistence.Embedded;
 import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
 import javax.persistence.Id;
 import javax.persistence.Version;
 
@@ -31,7 +30,6 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent;
 @Entity
 class CommandEntity {
   @Id
-  @GeneratedValue
   private long surrogateId;
 
   @Embedded
@@ -50,4 +48,8 @@ class CommandEntity {
     lastModified = new Date();
     command = new Command(event);
   }
+
+  Command command() {
+    return command;
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b7309e..9402486 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
 import java.util.List;
 import java.util.Optional;
 
-import org.apache.servicecomb.saga.alpha.core.Command;
+import javax.transaction.Transactional;
+
+import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
@@ -29,6 +31,7 @@ import org.springframework.data.repository.query.Param;
 public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
   Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
 
+  @Transactional
   @Modifying
   @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
       + "SET c.command.status = :status "
@@ -39,5 +42,12 @@ public interface CommandEntityRepository extends CrudRepository<CommandEntity, L
       @Param("globalTxId") String globalTxId,
       @Param("localTxId") String localTxId);
 
-  List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+  List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+
+  @Query("FROM CommandEntity c "
+      + "WHERE id IN ("
+      + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId"
+      + ") "
+      + "ORDER BY c.id ASC")
+  List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 9281b7e..aac3c22 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -24,11 +24,15 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
+  private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
+
   private final TxEventEnvelopeRepository eventRepository;
   private final CommandEntityRepository commandRepository;
 
@@ -44,7 +48,7 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public void saveCompensationCommand(String globalTxId, String localTxId) {
-    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc(
         globalTxId,
         localTxId,
         TxStartedEvent.name());
@@ -73,6 +77,18 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public List<Command> findUncompletedCommands(String globalTxId) {
-    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name())
+        .stream()
+        .map(CommandEntity::command)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<Command> findFirstCommandToCompensate() {
+    return commandRepository
+        .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST)
+        .stream()
+        .map(CommandEntity::command)
+        .collect(Collectors.toList());
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fcb7c00..85ed954 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -63,6 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEventEnvelope t2 "
       + "  WHERE t2.event.globalTxId = ?1 "
       + "  AND t2.event.localTxId = t.event.localTxId "
-      + "  AND t2.event.type = 'TxCompensatedEvent')")
+      + "  AND t2.event.type = 'TxCompensatedEvent')"
+      + "ORDER BY t.id ASC ")
   List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 6d4f91f..1514869 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -24,7 +24,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -34,11 +34,12 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
 
-import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -63,7 +64,8 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -97,7 +99,7 @@ public class AlphaIntegrationTest {
   private TxConsistentService consistentService;
 
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
-  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
+  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -222,11 +224,11 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
 
-    assertThat(receivedCommands, containsInAnyOrder(
-        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
-            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+    assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
-            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
+            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
     ));
   }
 
@@ -305,7 +307,7 @@ public class AlphaIntegrationTest {
     String anotherLocalTxId2 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -314,6 +316,15 @@ public class AlphaIntegrationTest {
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
   }
 
+  private GrpcAck onCompensation(GrpcCompensateCommand command) {
+    return blockingStub.onTxEvent(
+        eventOf(TxCompensatedEvent,
+            command.getLocalTxId(),
+            command.getParentTxId(),
+            new byte[0],
+            command.getCompensateMethod()));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
@@ -371,11 +382,21 @@ public class AlphaIntegrationTest {
   }
 
   private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    private final Consumer<GrpcCompensateCommand> consumer;
     private boolean completed = false;
 
+    private CompensateStreamObserver() {
+      this(command -> {});
+    }
+
+    private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+      this.consumer = consumer;
+    }
+
     @Override
     public void onNext(GrpcCompensateCommand command) {
       // intercept received command
+      consumer.accept(command);
       receivedCommands.add(command);
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/12: SCB-218 polling events into commands in the background

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit fff84355f893699b3e6903ac6ba362cf7960cf41
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 10:28:22 2018 +0800

    SCB-218 polling events into commands in the background
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/CommandRepository.java         |   5 +-
 ...{TxConsistentService.java => EventScanner.java} | 139 +++++++-------
 .../saga/alpha/core/TxConsistentService.java       | 102 +---------
 .../saga/alpha/core/TxEventRepository.java         |   5 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 208 ++-------------------
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  12 +-
 .../saga/alpha/server/SpringCommandRepository.java |  25 +--
 .../saga/alpha/server/SpringTxEventRepository.java |  10 +-
 .../alpha/server/TxEventEnvelopeRepository.java    |  31 ++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  12 +-
 10 files changed, 126 insertions(+), 423 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 22f2b41..1da033f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -20,11 +20,8 @@ package org.apache.servicecomb.saga.alpha.core;
 import java.util.List;
 
 public interface CommandRepository {
-  boolean exists(String globalTxId, String localTxId);
 
-  void saveCompensationCommand(String globalTxId, String localTxId);
-
-  void saveCompensationCommands(String globalTxId);
+  Iterable<Command> saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
similarity index 50%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 61beea5..ea4aaf6 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,111 +19,116 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TxConsistentService {
+public class EventScanner implements Runnable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
+  private final ScheduledExecutorService scheduler;
   private final TxEventRepository eventRepository;
   private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
-  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
-    put(TxAbortedEvent.name(), (event) -> compensate(event));
-    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
-  }};
-
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler;
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval) {
+  private final int commandPollingInterval;
+  private final int eventPollingInterval;
 
-    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
-  }
+  private long nextEndedEventId;
+  private long nextCompensatedEventId;
 
-  public TxConsistentService(TxEventRepository eventRepository,
+  public EventScanner(ScheduledExecutorService scheduler,
+      TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       int commandPollingInterval,
-      ScheduledExecutorService scheduler) {
+      int eventPollingInterval) {
+
+    this.scheduler = scheduler;
     this.eventRepository = eventRepository;
     this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
-    this.scheduler = scheduler;
-
-    scheduleCompensationCommandPolling(commandPollingInterval);
+    this.commandPollingInterval = commandPollingInterval;
+    this.eventPollingInterval = eventPollingInterval;
   }
 
-  public boolean handle(TxEvent event) {
-    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
-      return false;
-    }
-
-    eventRepository.save(event);
-
-    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
-    return true;
+  @Override
+  public void run() {
+    pollCompensationCommand(commandPollingInterval);
+    pollEvents();
   }
 
-  private void compensateIfAlreadyAborted(TxEvent event) {
-    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-    }
+  private void pollEvents() {
+    scheduler.scheduleWithFixedDelay(
+        () -> {
+          saveUncompensatedEventsToCommands();
+          updateCompensatedCommands();
+        },
+        0,
+        eventPollingInterval,
+        MILLISECONDS);
   }
 
-  private boolean isCompensationScheduled(TxEvent event) {
-    return commandRepository.exists(event.globalTxId(), event.localTxId());
+  private void saveUncompensatedEventsToCommands() {
+    eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+        .forEach(event -> {
+          log.info("Found uncompensated event {}", event);
+          nextEndedEventId = event.id();
+          commandRepository.saveCompensationCommands(event.globalTxId())
+              .forEach(command -> nextEndedEventId = command.id());
+        });
   }
 
-  private void compensate(TxEvent event) {
-    commandRepository.saveCompensationCommands(event.globalTxId());
+  private void updateCompensatedCommands() {
+    eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId, TxCompensatedEvent.name())
+        .ifPresent(event -> {
+          log.info("Found compensated event {}", event);
+          nextCompensatedEventId = event.id();
+          updateCompensationStatus(event);
+        });
   }
 
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
-  private void updateCompensateStatus(TxEvent event) {
+  private void updateCompensationStatus(TxEvent event) {
     commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+    log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+        event.globalTxId(),
+        event.localTxId());
 
     if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
         && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+
       markGlobalTxEnd(event);
-      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
     }
   }
 
   private void markGlobalTxEnd(TxEvent event) {
-    eventRepository.save(new TxEvent(
-        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
-        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
+    eventRepository.save(toSagaEndedEvent(event));
+    log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
   }
 
-  private boolean isGlobalTxAborted(TxEvent event) {
-    return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+  private TxEvent toSagaEndedEvent(TxEvent event) {
+    return new TxEvent(
+        event.serviceName(),
+        event.instanceId(),
+        new Date(),
+        event.globalTxId(),
+        event.globalTxId(),
+        null,
+        SagaEndedEvent.name(),
+        "",
+        EMPTY_PAYLOAD);
   }
 
-  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+  private void pollCompensationCommand(int commandPollingInterval) {
     scheduler.scheduleWithFixedDelay(
         () -> commandRepository.findFirstCommandToCompensate()
             .forEach(command -> {
@@ -131,19 +136,23 @@ public class TxConsistentService {
                   command.globalTxId(),
                   command.localTxId());
 
-              omegaCallback.compensate(new TxEvent(
-                  command.serviceName(),
-                  command.instanceId(),
-                  command.globalTxId(),
-                  command.localTxId(),
-                  command.parentTxId(),
-                  TxStartedEvent.name(),
-                  command.compensationMethod(),
-                  command.payloads()
-              ));
+              omegaCallback.compensate(txStartedEventOf(command));
             }),
         0,
         commandPollingInterval,
         MILLISECONDS);
   }
+
+  private TxEvent txStartedEventOf(Command command) {
+    return new TxEvent(
+        command.serviceName(),
+        command.instanceId(),
+        command.globalTxId(),
+        command.localTxId(),
+        command.parentTxId(),
+        TxStartedEvent.name(),
+        command.compensationMethod(),
+        command.payloads()
+    );
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 61beea5..c55090a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,133 +17,35 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
-  private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final TxEventRepository eventRepository;
-  private final CommandRepository commandRepository;
-  private final OmegaCallback omegaCallback;
-  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
-    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
-    put(TxAbortedEvent.name(), (event) -> compensate(event));
-    put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
-  }};
-
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  private final ScheduledExecutorService scheduler;
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval) {
 
-    this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
-  }
-
-  public TxConsistentService(TxEventRepository eventRepository,
-      CommandRepository commandRepository,
-      OmegaCallback omegaCallback,
-      int commandPollingInterval,
-      ScheduledExecutorService scheduler) {
+  public TxConsistentService(TxEventRepository eventRepository) {
     this.eventRepository = eventRepository;
-    this.commandRepository = commandRepository;
-    this.omegaCallback = omegaCallback;
-    this.scheduler = scheduler;
-
-    scheduleCompensationCommandPolling(commandPollingInterval);
   }
 
   public boolean handle(TxEvent event) {
     if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+      log.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted", event.globalTxId());
       return false;
     }
 
     eventRepository.save(event);
 
-    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
     return true;
   }
 
-  private void compensateIfAlreadyAborted(TxEvent event) {
-    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
-    }
-  }
-
-  private boolean isCompensationScheduled(TxEvent event) {
-    return commandRepository.exists(event.globalTxId(), event.localTxId());
-  }
-
-  private void compensate(TxEvent event) {
-    commandRepository.saveCompensationCommands(event.globalTxId());
-  }
-
-  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
-  // unless we ask user to specify a name for each participant in the global TX in @Compensable
-  private void updateCompensateStatus(TxEvent event) {
-    commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
-    log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
-
-    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
-        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
-      markGlobalTxEnd(event);
-      log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
-    }
-  }
-
-  private void markGlobalTxEnd(TxEvent event) {
-    eventRepository.save(new TxEvent(
-        event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
-        null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
-  }
-
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
-
-  private void scheduleCompensationCommandPolling(int commandPollingInterval) {
-    scheduler.scheduleWithFixedDelay(
-        () -> commandRepository.findFirstCommandToCompensate()
-            .forEach(command -> {
-              log.info("Compensating transaction with globalTxId {} and localTxId {}",
-                  command.globalTxId(),
-                  command.localTxId());
-
-              omegaCallback.compensate(new TxEvent(
-                  command.serviceName(),
-                  command.instanceId(),
-                  command.globalTxId(),
-                  command.localTxId(),
-                  command.parentTxId(),
-                  TxStartedEvent.name(),
-                  command.compensationMethod(),
-                  command.payloads()
-              ));
-            }),
-        0,
-        commandPollingInterval,
-        MILLISECONDS);
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index cf5706b..d793de2 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -18,13 +18,14 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface TxEventRepository {
   void save(TxEvent event);
 
   List<TxEvent> findTransactions(String globalTxId, String type);
 
-  TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+  List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
 
-  List<TxEvent> findTransactionsToCompensate(String globalTxId);
+  Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 212c621..473501e 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -18,31 +18,23 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static java.util.Collections.emptyList;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
@@ -64,93 +56,13 @@ public class TxConsistentServiceTest {
     }
 
     @Override
-    public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
-      return events.stream()
-          .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
-          .findFirst()
-          .get();
-    }
-
-    @Override
-    public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
-      return events.stream()
-          .filter(event -> globalTxId.equals(event.globalTxId())
-              && event.type().equals(TxStartedEvent.name())
-              && isCompleted(globalTxId, event)
-              && !isCompensated(globalTxId, event))
-          .collect(Collectors.toList());
-    }
-
-    private boolean isCompleted(String globalTxId, TxEvent event) {
-      return events.stream()
-          .filter(e -> globalTxId.equals(e.globalTxId())
-              && e.localTxId().equals(event.localTxId())
-              && e.type().equals(TxEndedEvent.name()))
-          .count() > 0;
-    }
-
-    private boolean isCompensated(String globalTxId, TxEvent event) {
-      return events.stream()
-          .filter(e -> globalTxId.equals(e.globalTxId())
-              && e.localTxId().equals(event.localTxId())
-              && e.type().equals(TxCompensatedEvent.name()))
-          .count() > 0;
-    }
-  };
-
-  private final List<Command> commands = new ArrayList<>();
-  private final CommandRepository commandRepository = new CommandRepository() {
-    @Override
-    public boolean exists(String globalTxId, String localTxId) {
-      return commands.stream()
-          .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
-    }
-
-    @Override
-    public void saveCompensationCommand(String globalTxId, String localTxId) {
-      TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
-      commands.add(new Command(event));
+    public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+      return emptyList();
     }
 
     @Override
-    public void saveCompensationCommands(String globalTxId) {
-      List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
-
-      Map<String, Command> commandMap = new HashMap<>();
-
-      for (TxEvent event : events) {
-        commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
-      }
-
-      commands.addAll(commandMap.values());
-    }
-
-    @Override
-    public void markCommandAsDone(String globalTxId, String localTxId) {
-      for (int i = 0; i < commands.size(); i++) {
-        Command command = commands.get(i);
-        if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
-          commands.set(i, new Command(command, DONE));
-        }
-      }
-    }
-
-    @Override
-    public List<Command> findUncompletedCommands(String globalTxId) {
-      return commands.stream()
-          .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
-          .collect(Collectors.toList());
-    }
-
-    @Override
-    public List<Command> findFirstCommandToCompensate() {
-      List<Command> results = new ArrayList<>(1);
-      commands.stream()
-          .filter(command -> !DONE.name().equals(command.status()))
-          .findFirst()
-          .ifPresent(results::add);
-
-      return results;
+    public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+      return Optional.empty();
     }
   };
 
@@ -161,20 +73,9 @@ public class TxConsistentServiceTest {
   private final String instanceId = uniquify("instanceId");
 
   private final String compensationMethod = getClass().getCanonicalName();
-  private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
-  private Consumer<TxEvent> eventConsumer = event -> {};
-  private final OmegaCallback omegaCallback = event -> {
-    eventConsumer.accept(event);
-    compensationContexts.add(
-        new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
-  };
-
-  private final TxConsistentService consistentService = new TxConsistentService(
-      eventRepository,
-      commandRepository,
-      omegaCallback,
-      300);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository);
+  private final byte[] payloads = "yeah".getBytes();
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -190,49 +91,6 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
-    assertThat(compensationContexts.isEmpty(), is(true));
-  }
-
-  @Test
-  public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
-    eventConsumer = event -> consistentService
-        .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
-
-    String localTxId1 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
-
-    String localTxId2 = UUID.randomUUID().toString();
-    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
-    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
-
-    TxEvent abortEvent = newEvent(TxAbortedEvent);
-
-    consistentService.handle(abortEvent);
-
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
-    ));
-
-    await().atMost(1, SECONDS).until(() -> events.size() == 8);
-    assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
-  }
-
-  @Test
-  public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
-    events.add(newEvent(TxStartedEvent));
-    events.add(newEvent(TxAbortedEvent));
-
-    TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
-
-    consistentService.handle(event);
-
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
-    ));
   }
 
   @Test
@@ -241,7 +99,7 @@ public class TxConsistentServiceTest {
     events.add(newEvent(TxStartedEvent));
     events.add(newEvent(TxAbortedEvent));
 
-    TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+    TxEvent event = eventOf(TxStartedEvent, localTxId1);
 
     consistentService.handle(event);
 
@@ -249,10 +107,10 @@ public class TxConsistentServiceTest {
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+    return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
   }
 
-  private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId, String compensationMethod) {
+  private TxEvent eventOf(EventType eventType, String localTxId) {
     return new TxEvent(serviceName, instanceId, new Date(),
         globalTxId,
         localTxId,
@@ -261,48 +119,4 @@ public class TxConsistentServiceTest {
         compensationMethod,
         payloads);
   }
-
-  private static class CompensationContext {
-    private final String globalTxId;
-    private final String localTxId;
-    private final String compensationMethod;
-    private final byte[] message;
-
-    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
-      this.globalTxId = globalTxId;
-      this.localTxId = localTxId;
-      this.compensationMethod = compensationMethod;
-      this.message = message;
-    }
-
-    @Override
-    public String toString() {
-      return "CompensationContext{" +
-          "globalTxId='" + globalTxId + '\'' +
-          ", localTxId='" + localTxId + '\'' +
-          ", compensationMethod='" + compensationMethod + '\'' +
-          ", message=" + Arrays.toString(message) +
-          '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      CompensationContext that = (CompensationContext) o;
-      return Objects.equals(globalTxId, that.globalTxId) &&
-          Objects.equals(localTxId, that.localTxId) &&
-          Objects.equals(compensationMethod, that.compensationMethod) &&
-          Arrays.equals(message, that.message);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
-    }
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 3740581..73298be 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -28,6 +28,7 @@ import javax.annotation.PostConstruct;
 
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
@@ -68,24 +69,27 @@ class AlphaConfig {
 
   @Bean
   ScheduledExecutorService compensationScheduler() {
-    return Executors.newSingleThreadScheduledExecutor();
+    return Executors.newScheduledThreadPool(2);
   }
 
   @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
-      @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+      @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
+      @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(
+    new EventScanner(scheduler,
         eventRepository,
         commandRepository,
         omegaCallback,
         commandPollingInterval,
-        scheduler);
+        eventPollingInterval).run();
+
+    TxConsistentService consistentService = new TxConsistentService(eventRepository);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 34b43a4..6076e54 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
+import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +29,12 @@ import java.util.Map;
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
 
   private final TxEventEnvelopeRepository eventRepository;
@@ -43,22 +46,7 @@ public class SpringCommandRepository implements CommandRepository {
   }
 
   @Override
-  public boolean exists(String globalTxId, String localTxId) {
-    return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent();
-  }
-
-  @Override
-  public void saveCompensationCommand(String globalTxId, String localTxId) {
-    TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(
-        globalTxId,
-        localTxId,
-        TxStartedEvent.name());
-
-    commandRepository.save(new Command(startedEvent));
-  }
-
-  @Override
-  public void saveCompensationCommands(String globalTxId) {
+  public Iterable<Command> saveCompensationCommands(String globalTxId) {
     List<TxEvent> events = eventRepository
         .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
@@ -68,7 +56,8 @@ public class SpringCommandRepository implements CommandRepository {
       commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
     }
 
-    commandRepository.save(commands.values());
+    log.info("Saving compensation commands {}", commands.values());
+    return commandRepository.save(commands.values());
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7c44639..4108aa5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,9 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.springframework.data.domain.PageRequest;
 
 class SpringTxEventRepository implements TxEventRepository {
   private final TxEventEnvelopeRepository eventRepo;
@@ -40,12 +42,12 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
-  public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
-    return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type);
+  public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1));
   }
 
   @Override
-  public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
-    return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
+  public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+    return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 09aec6f..78c2a1d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -18,8 +18,10 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
@@ -32,13 +34,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
-  TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type);
-
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
-      + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent'"
-      + "AND EXISTS ("
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
       + "  WHERE t1.globalTxId = ?1 "
@@ -50,19 +47,15 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
-      + "ORDER BY t.surrogateId ASC"
- )
-  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+      + "ORDER BY t.surrogateId ASC")
+  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
-  @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
-      + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
-      + ") FROM TxEvent t "
-      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
       + "  SELECT t1.globalTxId"
       + "  FROM TxEvent t1 "
-      + "  WHERE t1.globalTxId = ?1 "
-      + "  AND t1.localTxId = t.localTxId "
-      + "  AND t1.type = 'TxEndedEvent'"
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "  AND t1.type = 'TxAbortedEvent'"
       + ") AND NOT EXISTS ( "
       + "  SELECT t2.globalTxId"
       + "  FROM TxEvent t2 "
@@ -70,5 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+  List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
+
+  Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1bfbe3c..5cff57c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,8 +34,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -57,8 +55,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -69,7 +65,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -417,10 +413,4 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
-
-  @Primary
-  @Bean
-  ScheduledExecutorService scheduler() {
-    return Executors.newScheduledThreadPool(2);
-  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 07/12: SCB-218 added event id to check for command uniqueness

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit da2ef4619612ad08fea78db8da58eedb11f7b0aa
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 11:06:16 2018 +0800

    SCB-218 added event id to check for command uniqueness
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       | 24 ++++++++--------------
 .../saga/alpha/core/CommandRepository.java         |  2 +-
 .../servicecomb/saga/alpha/core/EventScanner.java  | 23 ++++++++++++---------
 .../saga/alpha/server/CommandEntityRepository.java | 10 ++++-----
 .../saga/alpha/server/SpringCommandRepository.java | 11 +++++++---
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../src/main/resources/schema-postgresql.sql       | 20 +++++++++++++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  2 +-
 alpha/alpha-server/src/test/resources/schema.sql   |  3 ++-
 9 files changed, 57 insertions(+), 40 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 904cc54..2716abf 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -22,6 +22,8 @@ import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 import java.util.Date;
 
 import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Version;
 
@@ -29,8 +31,10 @@ import javax.persistence.Version;
 public class Command {
 
   @Id
+  @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long surrogateId;
 
+  private long eventId;
   private String serviceName;
   private String instanceId;
   private String globalTxId;
@@ -48,7 +52,7 @@ public class Command {
   Command() {
   }
 
-  Command(long id,
+  private Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
@@ -58,7 +62,7 @@ public class Command {
       byte[] payloads,
       String status) {
 
-    this.surrogateId = id;
+    this.eventId = id;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.globalTxId = globalTxId;
@@ -70,7 +74,7 @@ public class Command {
     this.lastModified = new Date();
   }
 
-  Command(long id,
+  private Command(long id,
       String serviceName,
       String instanceId,
       String globalTxId,
@@ -82,18 +86,6 @@ public class Command {
     this(id, serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
-  Command(Command command, CommandStatus status) {
-    this(command.surrogateId,
-        command.serviceName,
-        command.instanceId,
-        command.globalTxId,
-        command.localTxId,
-        command.parentTxId,
-        command.compensationMethod,
-        command.payloads,
-        status.name());
-  }
-
   public Command(TxEvent event) {
     this(event.id(),
         event.serviceName(),
@@ -144,7 +136,7 @@ public class Command {
   @Override
   public String toString() {
     return "Command{" +
-        "surrogateId=" + surrogateId +
+        "eventId=" + eventId +
         ", serviceName='" + serviceName + '\'' +
         ", instanceId='" + instanceId + '\'' +
         ", globalTxId='" + globalTxId + '\'' +
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 1da033f..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 public interface CommandRepository {
 
-  Iterable<Command> saveCompensationCommands(String globalTxId);
+  void saveCompensationCommands(String globalTxId);
 
   void markCommandAsDone(String globalTxId, String localTxId);
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index ea4aaf6..5a4589d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -81,8 +81,7 @@ public class EventScanner implements Runnable {
         .forEach(event -> {
           log.info("Found uncompensated event {}", event);
           nextEndedEventId = event.id();
-          commandRepository.saveCompensationCommands(event.globalTxId())
-              .forEach(command -> nextEndedEventId = command.id());
+          commandRepository.saveCompensationCommands(event.globalTxId());
         });
   }
 
@@ -130,19 +129,23 @@ public class EventScanner implements Runnable {
 
   private void pollCompensationCommand(int commandPollingInterval) {
     scheduler.scheduleWithFixedDelay(
-        () -> commandRepository.findFirstCommandToCompensate()
-            .forEach(command -> {
-              log.info("Compensating transaction with globalTxId {} and localTxId {}",
-                  command.globalTxId(),
-                  command.localTxId());
-
-              omegaCallback.compensate(txStartedEventOf(command));
-            }),
+        this::compensate,
         0,
         commandPollingInterval,
         MILLISECONDS);
   }
 
+  private void compensate() {
+    commandRepository.findFirstCommandToCompensate()
+        .forEach(command -> {
+          log.info("Compensating transaction with globalTxId {} and localTxId {}",
+              command.globalTxId(),
+              command.localTxId());
+
+          omegaCallback.compensate(txStartedEventOf(command));
+        });
+  }
+
   private TxEvent txStartedEventOf(Command command) {
     return new TxEvent(
         command.serviceName(),
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index d7c583e..4b8c3ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
-import java.util.Optional;
 
 import javax.transaction.Transactional;
 
@@ -30,10 +29,9 @@ import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
 
 public interface CommandEntityRepository extends CrudRepository<Command, Long> {
-  Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
 
   @Transactional
-  @Modifying
+  @Modifying(clearAutomatically = true)
   @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
       + "SET c.status = :status "
       + "WHERE c.globalTxId = :globalTxId "
@@ -47,9 +45,9 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
 
   // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
   @Query("SELECT c FROM Command c "
-      + "WHERE c.surrogateId IN ("
-      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
+      + "WHERE c.eventId IN ("
+      + " SELECT MAX(c1.eventId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
       + ") "
-      + "ORDER BY c.surrogateId ASC")
+      + "ORDER BY c.eventId ASC")
   List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 6076e54..1dabeda 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -46,9 +46,9 @@ public class SpringCommandRepository implements CommandRepository {
   }
 
   @Override
-  public Iterable<Command> saveCompensationCommands(String globalTxId) {
+  public void saveCompensationCommands(String globalTxId) {
     List<TxEvent> events = eventRepository
-        .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+        .findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
     Map<String, Command> commands = new LinkedHashMap<>();
 
@@ -57,7 +57,12 @@ public class SpringCommandRepository implements CommandRepository {
     }
 
     log.info("Saving compensation commands {}", commands.values());
-    return commandRepository.save(commands.values());
+    try {
+      commandRepository.save(commands.values());
+    } catch (Exception e) {
+      log.warn("Failed to save some commands", e);
+    }
+    log.info("Saved compensation commands {}", commands.values());
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 78c2a1d..e974527 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -48,7 +48,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.localTxId = t.localTxId "
       + "  AND t2.type = 'TxCompensatedEvent') "
       + "ORDER BY t.surrogateId ASC")
-  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
   @Query("SELECT t FROM TxEvent t "
       + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index e84a9c3..d1c36c2 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -11,4 +11,22 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   payloads bytea
 );
 
-CREATE INDEX IF NOT EXISTS running_sagas_index ON TxEvent (globalTxId, localTxId, type);
+CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type);
+
+
+CREATE TABLE IF NOT EXISTS Command (
+  surrogateId BIGSERIAL PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(16) NOT NULL,
+  instanceId varchar(36) NOT NULL,
+  globalTxId varchar(36) NOT NULL,
+  localTxId varchar(36) NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  compensationMethod varchar(256) NOT NULL,
+  payloads bytea,
+  status varchar(12),
+  lastModified timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
+  version bigint NOT NULL,
+);
+
+CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, type, status);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5cff57c..a5356bb 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -65,7 +65,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index fb396b3..71444ec 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -13,7 +13,8 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
 ) DEFAULT CHARSET=utf8;
 
 CREATE TABLE IF NOT EXISTS `Command` (
-  `surrogateId` bigint NOT NULL,
+  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+  `eventId` bigint NOT NULL UNIQUE,
   `serviceName` varchar(36) NOT NULL,
   `instanceId` varchar(36) NOT NULL,
   `globalTxId` varchar(36) NOT NULL,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 12/12: SCB-218 included alpha core for coverage report

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit bd1277ba12611c7f436a6a44638e3ae7123ca345
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 20 08:44:10 2018 +0800

    SCB-218 included alpha core for coverage report
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 integration-tests/coverage-aggregate/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/integration-tests/coverage-aggregate/pom.xml b/integration-tests/coverage-aggregate/pom.xml
index e599f09..a156600 100644
--- a/integration-tests/coverage-aggregate/pom.xml
+++ b/integration-tests/coverage-aggregate/pom.xml
@@ -75,6 +75,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb.saga</groupId>
+      <artifactId>alpha-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb.saga</groupId>
       <artifactId>alpha-server</artifactId>
     </dependency>
     <dependency>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/12: SCB-218 updated command status to pending when compensating

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 7aafab876c21da6f50ea10aad5095d3b18bcdf66
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 17:55:43 2018 +0800

    SCB-218 updated command status to pending when compensating
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../java/org/apache/servicecomb/saga/alpha/core/Command.java |  4 ++--
 .../apache/servicecomb/saga/alpha/core/CommandStatus.java    |  1 +
 .../saga/alpha/server/CommandEntityRepository.java           |  3 ++-
 .../saga/alpha/server/SpringCommandRepository.java           | 11 ++++++++++-
 .../servicecomb/saga/alpha/server/AlphaIntegrationTest.java  | 12 +++++++++++-
 5 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index b5902e1..904cc54 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -113,11 +113,11 @@ public class Command {
     return instanceId;
   }
 
-  String globalTxId() {
+  public String globalTxId() {
     return globalTxId;
   }
 
-  String localTxId() {
+  public String localTxId() {
     return localTxId;
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
index cdf1f6c..0c9b78b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -19,5 +19,6 @@ package org.apache.servicecomb.saga.alpha.core;
 
 public enum CommandStatus {
   NEW,
+  PENDING,
   DONE
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index fffea56..d7c583e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -45,9 +45,10 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
 
   List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
 
+  // TODO: 2018/1/18 we assumed compensation will never fail. if all service instances are not reachable, we have to set up retry mechanism for pending commands
   @Query("SELECT c FROM Command c "
       + "WHERE c.surrogateId IN ("
-      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId"
+      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status = 'NEW' GROUP BY c1.globalTxId"
       + ") "
       + "ORDER BY c.surrogateId ASC")
   List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 18ee9ad..34b43a4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.LinkedHashMap;
@@ -82,7 +83,15 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public List<Command> findFirstCommandToCompensate() {
-    return commandRepository
+    List<Command> commands = commandRepository
         .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
+
+    commands.forEach(command ->
+        commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+            PENDING.name(),
+            command.globalTxId(),
+            command.localTxId()));
+
+    return commands;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1514869..1bfbe3c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -55,6 +57,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -65,7 +69,7 @@ import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
-    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
+    properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -413,4 +417,10 @@ public class AlphaIntegrationTest {
       return completed;
     }
   }
+
+  @Primary
+  @Bean
+  ScheduledExecutorService scheduler() {
+    return Executors.newScheduledThreadPool(2);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/12: SCB-218 updated schemas accordingly due to change of ORM tech

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9a401719de2db3e81e6fa04b2d36ffa6305c0460
Author: seanyinx <se...@huawei.com>
AuthorDate: Thu Jan 18 15:07:29 2018 +0800

    SCB-218 updated schemas accordingly due to change of ORM tech
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/Command.java       | 17 +++++++
 .../servicecomb/saga/alpha/core/TxEvent.java       |  5 ++
 .../saga/alpha/server/CommandEntity.java           | 55 ----------------------
 .../saga/alpha/server/CommandEntityRepository.java | 25 +++++-----
 .../saga/alpha/server/SpringCommandRepository.java | 26 ++++------
 .../saga/alpha/server/SpringTxEventRepository.java |  2 +-
 .../alpha/server/TxEventEnvelopeRepository.java    | 33 +++++++------
 alpha/alpha-server/src/test/resources/schema.sql   | 15 ++++++
 8 files changed, 80 insertions(+), 98 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index c852c16..b29988b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -19,7 +19,18 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+@Entity
 public class Command {
+
+  @Id
+  private Long surrogateId;
+
   private String serviceName;
   private String instanceId;
   private String globalTxId;
@@ -29,6 +40,11 @@ public class Command {
   private byte[] payloads;
   private String status;
 
+  private Date lastModified;
+
+  @Version
+  private int version;
+
   Command() {
   }
 
@@ -49,6 +65,7 @@ public class Command {
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
     this.status = status;
+    this.lastModified = new Date();
   }
 
   Command(String serviceName,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 760dd70..0ff2299 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -65,6 +65,7 @@ public class TxEvent {
       String type,
       String compensationMethod,
       byte[] payloads) {
+    this.surrogateId = -1L;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
     this.creationTime = creationTime;
@@ -112,6 +113,10 @@ public class TxEvent {
     return payloads;
   }
 
+  public long id() {
+    return surrogateId;
+  }
+
   @Override
   public String toString() {
     return "TxEvent{" +
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
deleted file mode 100644
index 6207002..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server;
-
-import java.util.Date;
-
-import javax.persistence.Embedded;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.Version;
-
-import org.apache.servicecomb.saga.alpha.core.Command;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-
-@Entity
-class CommandEntity {
-  @Id
-  private long surrogateId;
-
-  @Embedded
-  private Command command;
-
-  private Date lastModified;
-
-  @Version
-  private int version;
-
-  CommandEntity() {
-  }
-
-  CommandEntity(long id, TxEvent event) {
-    surrogateId = id;
-    lastModified = new Date();
-    command = new Command(event);
-  }
-
-  Command command() {
-    return command;
-  }
-}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 9402486..fffea56 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -22,32 +22,33 @@ import java.util.Optional;
 
 import javax.transaction.Transactional;
 
+import org.apache.servicecomb.saga.alpha.core.Command;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
 
-public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
-  Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
+public interface CommandEntityRepository extends CrudRepository<Command, Long> {
+  Optional<Command> findByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
 
   @Transactional
   @Modifying
-  @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
-      + "SET c.command.status = :status "
-      + "WHERE c.command.globalTxId = :globalTxId "
-      + "AND c.command.localTxId = :localTxId")
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+      + "SET c.status = :status "
+      + "WHERE c.globalTxId = :globalTxId "
+      + "AND c.localTxId = :localTxId")
   void updateStatusByGlobalTxIdAndLocalTxId(
       @Param("status") String status,
       @Param("globalTxId") String globalTxId,
       @Param("localTxId") String localTxId);
 
-  List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+  List<Command> findByGlobalTxIdAndStatus(String globalTxId, String status);
 
-  @Query("FROM CommandEntity c "
-      + "WHERE id IN ("
-      + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId"
+  @Query("SELECT c FROM Command c "
+      + "WHERE c.surrogateId IN ("
+      + " SELECT MAX(c1.surrogateId) FROM Command c1 WHERE c1.status <> 'DONE' GROUP BY c1.globalTxId"
       + ") "
-      + "ORDER BY c.id ASC")
-  List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable);
+      + "ORDER BY c.surrogateId ASC")
+  List<Command> findFirstGroupByGlobalTxIdOrderByIdDesc(Pageable pageable);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index aac3c22..a335849 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -24,10 +24,10 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.alpha.core.Command;
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.springframework.data.domain.PageRequest;
 
 public class SpringCommandRepository implements CommandRepository {
@@ -43,28 +43,28 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public boolean exists(String globalTxId, String localTxId) {
-    return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent();
+    return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent();
   }
 
   @Override
   public void saveCompensationCommand(String globalTxId, String localTxId) {
-    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc(
+    TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(
         globalTxId,
         localTxId,
         TxStartedEvent.name());
 
-    commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event()));
+    commandRepository.save(new Command(startedEvent));
   }
 
   @Override
   public void saveCompensationCommands(String globalTxId) {
-    List<TxEventEnvelope> events = eventRepository
+    List<TxEvent> events = eventRepository
         .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
 
-    Map<String, CommandEntity> commands = new HashMap<>();
+    Map<String, Command> commands = new HashMap<>();
 
-    for (TxEventEnvelope event : events) {
-      commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event()));
+    for (TxEvent event : events) {
+      commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
     }
 
     commandRepository.save(commands.values());
@@ -77,18 +77,12 @@ public class SpringCommandRepository implements CommandRepository {
 
   @Override
   public List<Command> findUncompletedCommands(String globalTxId) {
-    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name())
-        .stream()
-        .map(CommandEntity::command)
-        .collect(Collectors.toList());
+    return commandRepository.findByGlobalTxIdAndStatus(globalTxId, NEW.name());
   }
 
   @Override
   public List<Command> findFirstCommandToCompensate() {
     return commandRepository
-        .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST)
-        .stream()
-        .map(CommandEntity::command)
-        .collect(Collectors.toList());
+        .findFirstGroupByGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index f0743e9..7c44639 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -41,7 +41,7 @@ class SpringTxEventRepository implements TxEventRepository {
 
   @Override
   public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
-    return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndType(globalTxId, localTxId, type);
+    return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type);
   }
 
   @Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 85ed954..71d5f1b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -32,7 +32,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "WHERE t.globalTxId = ?1 AND t.type = ?2")
   List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 
-  TxEvent findFirstByGlobalTxIdAndLocalTxIdAndType(String globalTxId, String localTxId, String type);
+  TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type);
 
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
@@ -49,21 +49,26 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  FROM TxEvent t2 "
       + "  WHERE t2.globalTxId = ?1 "
       + "  AND t2.localTxId = t.localTxId "
-      + "  AND t2.type = 'TxCompensatedEvent')"
+      + "  AND t2.type = 'TxCompensatedEvent') "
+      + "ORDER BY t.surrogateId ASC"
  )
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 
-  @Query("FROM TxEventEnvelope t "
-      + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
-      + "  FROM TxEventEnvelope t1 "
-      + "  WHERE t1.event.globalTxId = ?1 "
-      + "  AND t1.event.localTxId = t.event.localTxId "
-      + "  AND t1.event.type = 'TxEndedEvent'"
+  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+      + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+      + ") FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+      + "  SELECT t1.globalTxId"
+      + "  FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = ?1 "
+      + "  AND t1.localTxId = t.localTxId "
+      + "  AND t1.type = 'TxEndedEvent'"
       + ") AND NOT EXISTS ( "
-      + "  FROM TxEventEnvelope t2 "
-      + "  WHERE t2.event.globalTxId = ?1 "
-      + "  AND t2.event.localTxId = t.event.localTxId "
-      + "  AND t2.event.type = 'TxCompensatedEvent')"
-      + "ORDER BY t.id ASC ")
-  List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+      + "  SELECT t2.globalTxId"
+      + "  FROM TxEvent t2 "
+      + "  WHERE t2.globalTxId = ?1 "
+      + "  AND t2.localTxId = t.localTxId "
+      + "  AND t2.type = 'TxCompensatedEvent') "
+      + "ORDER BY t.surrogateId ASC")
+  List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 08945a7..c6d66de 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -11,3 +11,18 @@ CREATE TABLE IF NOT EXISTS `TxEvent` (
   `payloads` varbinary(10240),
   PRIMARY KEY (`surrogateId`)
 ) DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `Command` (
+  `surrogateId` bigint NOT NULL AUTO_INCREMENT,
+  `serviceName` varchar(36) NOT NULL,
+  `instanceId` varchar(36) NOT NULL,
+  `globalTxId` varchar(36) NOT NULL,
+  `localTxId` varchar(36) NOT NULL,
+  `parentTxId` varchar(36) DEFAULT NULL,
+  `compensationMethod` varchar(256) NOT NULL,
+  `payloads` varbinary(10240),
+  `status` varchar(12),
+  `lastModified` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+  `version` bigint NOT NULL,
+  PRIMARY KEY (`surrogateId`)
+) DEFAULT CHARSET=utf8;

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 11/12: SCB-218 address out of order compensation later

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 311efc7c72c3305082cb0ab31cf1fd8d68f112de
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 20 08:41:46 2018 +0800

    SCB-218 address out of order compensation later
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5dddc1d..4635425 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -25,7 +25,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -257,7 +257,7 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
 
-    assertThat(receivedCommands, contains(
+    assertThat(receivedCommands, containsInAnyOrder(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
             .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 10/12: SCB-218 removed missing column

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 28d8afad42131e07c1db1997ceae0e425c453206
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 18:43:03 2018 +0800

    SCB-218 removed missing column
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-server/src/main/resources/schema-postgresql.sql | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index d1c36c2..d6b5172 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS Command (
   payloads bytea,
   status varchar(12),
   lastModified timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
-  version bigint NOT NULL,
+  version bigint NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, type, status);
+CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, status);

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/12: SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 4b896392556f6b3b9dd7f21bf08193fc82eb5640
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 11:54:52 2018 +0800

    SCB-218 replaced in memory compensation store with persistent repo to make alpha stateless
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/{TxEvent.java => Command.java} | 93 +++++++++-------------
 .../saga/alpha/core/CommandRepository.java         | 32 ++++++++
 .../servicecomb/saga/alpha/core/CommandStatus.java | 23 ++++++
 .../saga/alpha/core/TxConsistentService.java       | 28 +++----
 .../servicecomb/saga/alpha/core/TxEvent.java       | 14 ++++
 .../saga/alpha/core/TxConsistentServiceTest.java   | 56 ++++++++++++-
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  9 ++-
 .../saga/alpha/server/CommandEntity.java           | 53 ++++++++++++
 .../saga/alpha/server/CommandEntityRepository.java | 43 ++++++++++
 .../saga/alpha/server/SpringCommandRepository.java | 78 ++++++++++++++++++
 .../alpha/server/TxEventEnvelopeRepository.java    | 13 +++
 .../saga/omega/transaction/TransactionAspect.java  |  1 +
 12 files changed, 368 insertions(+), 75 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
similarity index 58%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 37a29f1..08f8527 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -17,98 +17,81 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-
-@Entity
-public class TxEvent {
-  @Id
-  @GeneratedValue(strategy = GenerationType.IDENTITY)
-  private Long surrogateId;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
 
+public class Command {
   private String serviceName;
   private String instanceId;
-  private Date creationTime;
   private String globalTxId;
   private String localTxId;
   private String parentTxId;
-  private String type;
   private String compensationMethod;
   private byte[] payloads;
+  private String status;
 
-  private TxEvent() {
+  Command() {
   }
 
-  public TxEvent(
-      String serviceName,
+  Command(String serviceName,
       String instanceId,
       String globalTxId,
       String localTxId,
       String parentTxId,
-      String type,
       String compensationMethod,
-      byte[] payloads) {
-    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
-  }
+      byte[] payloads,
+      String status) {
 
-  public TxEvent(
-      String serviceName,
-      String instanceId,
-      Date creationTime,
-      String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String type,
-      String compensationMethod,
-      byte[] payloads) {
     this.serviceName = serviceName;
     this.instanceId = instanceId;
-    this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
-    this.type = type;
     this.compensationMethod = compensationMethod;
     this.payloads = payloads;
+    this.status = status;
   }
 
-  public String serviceName() {
-    return serviceName;
+  Command(String serviceName,
+      String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String compensationMethod,
+      byte[] payloads) {
+
+    this(serviceName, instanceId, globalTxId, localTxId, parentTxId, compensationMethod, payloads, NEW.name());
   }
 
-  public String instanceId() {
-    return instanceId;
+  Command(Command command, CommandStatus status) {
+    this(command.serviceName,
+        command.instanceId,
+        command.globalTxId,
+        command.localTxId,
+        command.parentTxId,
+        command.compensationMethod,
+        command.payloads,
+        status.name());
   }
 
-  public Date creationTime() {
-    return creationTime;
+  public Command(TxEvent event) {
+    this(event.serviceName(),
+        event.instanceId(),
+        event.globalTxId(),
+        event.localTxId(),
+        event.parentTxId(),
+        event.compensationMethod(),
+        event.payloads());
   }
 
-  public String globalTxId() {
+  String globalTxId() {
     return globalTxId;
   }
 
-  public String localTxId() {
+  String localTxId() {
     return localTxId;
   }
 
-  public String parentTxId() {
-    return parentTxId;
-  }
-
-  public String type() {
-    return type;
-  }
-
-  public String compensationMethod() {
-    return compensationMethod;
-  }
-
-  public byte[] payloads() {
-    return payloads;
+  String status() {
+    return status;
   }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
new file mode 100644
index 0000000..915d476
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+import java.util.List;
+
+public interface CommandRepository {
+  boolean exists(String globalTxId, String localTxId);
+
+  void saveCompensationCommand(String globalTxId, String localTxId);
+
+  void saveCompensationCommands(String globalTxId);
+
+  void markCommandAsDone(String globalTxId, String localTxId);
+
+  List<Command> findUncompletedCommands(String globalTxId);
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
new file mode 100644
index 0000000..cdf1f6c
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.core;
+
+public enum CommandStatus {
+  NEW,
+  DONE
+}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 5dc5788..560096f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,7 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.Collections.emptySet;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -26,10 +25,8 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -40,6 +37,7 @@ public class TxConsistentService {
   private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
   private final TxEventRepository eventRepository;
+  private final CommandRepository commandRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
     put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
@@ -47,11 +45,13 @@ public class TxConsistentService {
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
-  private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
-  public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+  public TxConsistentService(TxEventRepository eventRepository,
+      CommandRepository commandRepository,
+      OmegaCallback omegaCallback) {
     this.eventRepository = eventRepository;
+    this.commandRepository = commandRepository;
     this.omegaCallback = omegaCallback;
   }
 
@@ -68,7 +68,7 @@ public class TxConsistentService {
 
   private void compensateIfAlreadyAborted(TxEvent event) {
     if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
-      eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+      commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
       TxEvent correspondingStartedEvent = eventRepository
           .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
 
@@ -77,7 +77,7 @@ public class TxConsistentService {
   }
 
   private boolean isCompensationScheduled(TxEvent event) {
-    return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
+    return commandRepository.exists(event.globalTxId(), event.localTxId());
   }
 
   private void compensate(TxEvent event) {
@@ -85,8 +85,7 @@ public class TxConsistentService {
 
     events.removeIf(this::isCompensationScheduled);
 
-    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
-    events.forEach(e -> localTxIds.add(e.localTxId()));
+    commandRepository.saveCompensationCommands(event.globalTxId());
 
     events.forEach(omegaCallback::compensate);
   }
@@ -94,13 +93,10 @@ public class TxConsistentService {
   // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
   // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
-    Set<String> events = eventsToCompensate.get(event.globalTxId());
-    if (events != null) {
-      events.remove(event.localTxId());
-      if (events.isEmpty()) {
-        markGlobalTxEnd(event);
-        eventsToCompensate.remove(event.globalTxId());
-      }
+    commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+    if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
+        && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+      markGlobalTxEnd(event);
     }
   }
 
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 37a29f1..760dd70 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -111,4 +111,18 @@ public class TxEvent {
   public byte[] payloads() {
     return payloads;
   }
+
+  @Override
+  public String toString() {
+    return "TxEvent{" +
+        "serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", creationTime=" + creationTime +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        '}';
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 99667e7..8ae60a3 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -34,16 +35,20 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.junit.Test;
 
 public class TxConsistentServiceTest {
-  private final List<TxEvent> events = new ArrayList<>();
+  private final Deque<TxEvent> events = new ConcurrentLinkedDeque<>();
   private final TxEventRepository eventRepository = new TxEventRepository() {
     @Override
     public void save(TxEvent event) {
@@ -92,6 +97,51 @@ public class TxConsistentServiceTest {
     }
   };
 
+  private final List<Command> commands = new ArrayList<>();
+  private final CommandRepository commandRepository = new CommandRepository() {
+    @Override
+    public boolean exists(String globalTxId, String localTxId) {
+      return commands.stream()
+          .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
+    }
+
+    @Override
+    public void saveCompensationCommand(String globalTxId, String localTxId) {
+      TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
+      commands.add(new Command(event));
+    }
+
+    @Override
+    public void saveCompensationCommands(String globalTxId) {
+      List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
+
+      Map<String, Command> commandMap = new HashMap<>();
+
+      for (TxEvent event : events) {
+        commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
+      }
+
+      commands.addAll(commandMap.values());
+    }
+
+    @Override
+    public void markCommandAsDone(String globalTxId, String localTxId) {
+      for (int i = 0; i < commands.size(); i++) {
+        Command command = commands.get(i);
+        if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
+          commands.set(i, new Command(command, DONE));
+        }
+      }
+    }
+
+    @Override
+    public List<Command> findUncompletedCommands(String globalTxId) {
+      return commands.stream()
+          .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
+          .collect(Collectors.toList());
+    }
+  };
+
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
@@ -104,7 +154,7 @@ public class TxConsistentServiceTest {
   private final OmegaCallback omegaCallback = event ->
       compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
 
-  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
 
   @Test
   public void persistEventOnArrival() throws Exception {
@@ -150,7 +200,7 @@ public class TxConsistentServiceTest {
     consistentService.handle(compensateEvent1);
 
     await().atMost(1, SECONDS).until(() -> events.size() == 8);
-    assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name()));
+    assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
   }
 
   @Test
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index bb4ba89..00dfe27 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.PostConstruct;
 
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
@@ -59,12 +60,18 @@ class AlphaConfig {
   }
 
   @Bean
+  CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) {
+    return new SpringCommandRepository(eventRepo, commandRepository);
+  }
+
+  @Bean
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
+      CommandRepository commandRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+    TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
new file mode 100644
index 0000000..3eac681
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.Date;
+
+import javax.persistence.Embedded;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Version;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+
+@Entity
+class CommandEntity {
+  @Id
+  @GeneratedValue
+  private long surrogateId;
+
+  @Embedded
+  private Command command;
+
+  private Date lastModified;
+
+  @Version
+  private int version;
+
+  CommandEntity() {
+  }
+
+  CommandEntity(long id, TxEvent event) {
+    surrogateId = id;
+    lastModified = new Date();
+    command = new Command(event);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
new file mode 100644
index 0000000..4b7309e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
+  Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
+
+  @Modifying
+  @Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
+      + "SET c.command.status = :status "
+      + "WHERE c.command.globalTxId = :globalTxId "
+      + "AND c.command.localTxId = :localTxId")
+  void updateStatusByGlobalTxIdAndLocalTxId(
+      @Param("status") String status,
+      @Param("globalTxId") String globalTxId,
+      @Param("localTxId") String localTxId);
+
+  List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
new file mode 100644
index 0000000..9281b7e
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+
+public class SpringCommandRepository implements CommandRepository {
+  private final TxEventEnvelopeRepository eventRepository;
+  private final CommandEntityRepository commandRepository;
+
+  SpringCommandRepository(TxEventEnvelopeRepository eventRepository, CommandEntityRepository commandRepository) {
+    this.eventRepository = eventRepository;
+    this.commandRepository = commandRepository;
+  }
+
+  @Override
+  public boolean exists(String globalTxId, String localTxId) {
+    return commandRepository.findByCommandGlobalTxIdAndCommandLocalTxId(globalTxId, localTxId).isPresent();
+  }
+
+  @Override
+  public void saveCompensationCommand(String globalTxId, String localTxId) {
+    TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+        globalTxId,
+        localTxId,
+        TxStartedEvent.name());
+
+    commandRepository.save(new CommandEntity(startedEvent.id(), startedEvent.event()));
+  }
+
+  @Override
+  public void saveCompensationCommands(String globalTxId) {
+    List<TxEventEnvelope> events = eventRepository
+        .findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
+
+    Map<String, CommandEntity> commands = new HashMap<>();
+
+    for (TxEventEnvelope event : events) {
+      commands.computeIfAbsent(event.localTxId(), k -> new CommandEntity(event.id(), event.event()));
+    }
+
+    commandRepository.save(commands.values());
+  }
+
+  @Override
+  public void markCommandAsDone(String globalTxId, String localTxId) {
+    commandRepository.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId);
+  }
+
+  @Override
+  public List<Command> findUncompletedCommands(String globalTxId) {
+    return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index bdf82f1..fcb7c00 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -52,4 +52,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "  AND t2.type = 'TxCompensatedEvent')"
  )
   List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+
+  @Query("FROM TxEventEnvelope t "
+      + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+      + "  FROM TxEventEnvelope t1 "
+      + "  WHERE t1.event.globalTxId = ?1 "
+      + "  AND t1.event.localTxId = t.event.localTxId "
+      + "  AND t1.event.type = 'TxEndedEvent'"
+      + ") AND NOT EXISTS ( "
+      + "  FROM TxEventEnvelope t2 "
+      + "  WHERE t2.event.globalTxId = ?1 "
+      + "  AND t2.event.localTxId = t.event.localTxId "
+      + "  AND t2.event.type = 'TxCompensatedEvent')")
+  List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index cead07a..5a61dc7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -68,6 +68,7 @@ public class TransactionAspect {
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
+    // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha
     scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
 
     try {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.