You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/02/08 08:50:01 UTC

[incubator-servicecomb-saga] 06/06: SCB-239 handle timeout in EventScanner

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit bc3879f204e41dc6bb9bc9f290a12eb524202f51
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Feb 5 23:02:29 2018 +0800

    SCB-239 handle timeout in EventScanner
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/core/EventScanner.java  | 58 +++++++++----
 .../saga/alpha/core/TxConsistentService.java       | 35 +-------
 .../servicecomb/saga/alpha/core/TxEvent.java       | 39 ++++++---
 .../saga/alpha/core/TxEventRepository.java         |  4 +
 .../servicecomb/saga/alpha/core/TxTimeout.java     | 50 ++++++++---
 .../saga/alpha/core/TxTimeoutRepository.java       |  6 +-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 81 ++++--------------
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  2 +-
 .../saga/alpha/server/SpringTxEventRepository.java | 10 +++
 .../alpha/server/SpringTxTimeoutRepository.java    | 26 ++----
 .../alpha/server/TxEventEnvelopeRepository.java    | 16 ++++
 .../alpha/server/TxTimeoutEntityRepository.java    | 36 ++++----
 .../src/main/resources/schema-postgresql.sql       | 13 ++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  7 +-
 alpha/alpha-server/src/test/resources/schema.sql   | 11 ++-
 .../omega/transaction/OnceAwareInterceptor.java    | 49 -----------
 .../saga/omega/transaction/TransactionAspect.java  |  1 -
 .../transaction/OnceAwareInterceptorTest.java      | 98 ----------------------
 18 files changed, 202 insertions(+), 340 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 4f72a1c..a52ebe5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.alpha.core;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -66,7 +67,9 @@ public class EventScanner implements Runnable {
   private void pollEvents() {
     scheduler.scheduleWithFixedDelay(
         () -> {
-          abortTimeoutEvent();
+          updateTimeoutStatus();
+          findTimeoutEvents();
+          abortTimeoutEvents();
           saveUncompensatedEventsToCommands();
           compensate();
           updateCompensatedCommands();
@@ -78,6 +81,18 @@ public class EventScanner implements Runnable {
         MILLISECONDS);
   }
 
+  private void findTimeoutEvents() {
+    eventRepository.findTimeoutEvents()
+        .forEach(event -> {
+          log.info("Found timeout event {}", event);
+          timeoutRepository.save(txTimeoutOf(event));
+        });
+  }
+
+  private void updateTimeoutStatus() {
+    timeoutRepository.markTimeoutAsDone();
+  }
+
   private void saveUncompensatedEventsToCommands() {
     eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
         .forEach(event -> {
@@ -113,15 +128,15 @@ public class EventScanner implements Runnable {
     markSagaEnded(event);
   }
 
-  private void abortTimeoutEvent() {
-    timeoutRepository.findFirstTimeoutTxToAbort().forEach(event -> {
-      log.info("Found timeout event {}", event);
+  private void abortTimeoutEvents() {
+    timeoutRepository.findFirstTimeout().forEach(timeout -> {
+      log.info("Found timeout event {} to abort", timeout);
 
-      eventRepository.save(toTxAbortedEvent(event));
-      timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId());
+      eventRepository.save(toTxAbortedEvent(timeout));
 
-      if (event.type().equals(TxStartedEvent.name())) {
-        omegaCallback.compensate(event);
+      if (timeout.type().equals(TxStartedEvent.name())) {
+        eventRepository.findTxStartedEventToCompensate(timeout.globalTxId(), timeout.localTxId())
+            .ifPresent(omegaCallback::compensate);
       }
     });
   }
@@ -138,17 +153,16 @@ public class EventScanner implements Runnable {
 
   private void markGlobalTxEnd(TxEvent event) {
     eventRepository.save(toSagaEndedEvent(event));
-    timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId());
     log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
   }
 
-  private TxEvent toTxAbortedEvent(TxEvent event) {
+  private TxEvent toTxAbortedEvent(TxTimeout timeout) {
     return new TxEvent(
-        event.serviceName(),
-        event.instanceId(),
-        event.globalTxId(),
-        event.localTxId(),
-        event.parentTxId(),
+        timeout.serviceName(),
+        timeout.instanceId(),
+        timeout.globalTxId(),
+        timeout.localTxId(),
+        timeout.parentTxId(),
         TxAbortedEvent.name(),
         "",
         ("Transaction timeout").getBytes());
@@ -189,4 +203,18 @@ public class EventScanner implements Runnable {
         command.payloads()
     );
   }
+
+  private TxTimeout txTimeoutOf(TxEvent event) {
+    return new TxTimeout(
+        event.id(),
+        event.serviceName(),
+        event.instanceId(),
+        event.globalTxId(),
+        event.localTxId(),
+        event.parentTxId(),
+        event.type(),
+        event.expiryTime(),
+        NEW.name()
+    );
+  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 541d54f..c55090a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,18 +17,10 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.concurrent.CompletableFuture;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,12 +29,9 @@ public class TxConsistentService {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final TxEventRepository eventRepository;
-  private final TxTimeoutRepository timeoutRepository;
 
-  public TxConsistentService(TxEventRepository eventRepository,
-      TxTimeoutRepository timeoutRepository) {
+  public TxConsistentService(TxEventRepository eventRepository) {
     this.eventRepository = eventRepository;
-    this.timeoutRepository = timeoutRepository;
   }
 
   public boolean handle(TxEvent event) {
@@ -51,33 +40,11 @@ public class TxConsistentService {
       return false;
     }
 
-    if (isEventWithTimeout(event)) {
-      saveTxTimeout(event);
-    }
-
     eventRepository.save(event);
 
-    if (Arrays.asList(TxEndedEvent.name(), SagaEndedEvent.name(), TxAbortedEvent.name()).contains(event.type())) {
-      CompletableFuture.runAsync(() -> timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()));
-    }
-
     return true;
   }
 
-  private boolean isEventWithTimeout(TxEvent event) {
-    return Arrays.asList(TxStartedEvent.name(), SagaStartedEvent.name()).contains(event.type()) && event.timeout() != 0;
-  }
-
-  private void saveTxTimeout(TxEvent event) {
-    Date expireTime = new Date(event.creationTime().getTime() + SECONDS.toMillis(event.timeout()));
-    timeoutRepository.save(
-        new TxTimeout(
-            event.globalTxId(),
-            event.localTxId(),
-            expireTime,
-            NEW.name()));
-  }
-
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index e34b7c6..42a202f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.util.Date;
 
 import javax.persistence.Entity;
@@ -24,10 +26,12 @@ import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Transient;
-import javax.persistence.Version;
 
 @Entity
 public class TxEvent {
+  @Transient
+  private static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00
+
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long surrogateId;
@@ -40,14 +44,9 @@ public class TxEvent {
   private String parentTxId;
   private String type;
   private String compensationMethod;
+  private Date expiryTime;
   private byte[] payloads;
 
-  @Version
-  private long version;
-
-  @Transient
-  private int timeout;
-
   private TxEvent() {
   }
 
@@ -61,7 +60,7 @@ public class TxEvent {
         event.parentTxId,
         event.type,
         event.compensationMethod,
-        0,
+        event.expiryTime,
         event.payloads);
   }
 
@@ -117,7 +116,23 @@ public class TxEvent {
       String compensationMethod,
       int timeout,
       byte[] payloads) {
+    this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
+        compensationMethod,
+        timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+        payloads);
+  }
 
+  TxEvent(Long surrogateId,
+      String serviceName,
+      String instanceId,
+      Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      Date expiryTime,
+      byte[] payloads) {
     this.surrogateId = surrogateId;
     this.serviceName = serviceName;
     this.instanceId = instanceId;
@@ -127,8 +142,8 @@ public class TxEvent {
     this.parentTxId = parentTxId;
     this.type = type;
     this.compensationMethod = compensationMethod;
+    this.expiryTime = expiryTime;
     this.payloads = payloads;
-    this.timeout = timeout;
   }
 
   public String serviceName() {
@@ -171,8 +186,8 @@ public class TxEvent {
     return surrogateId;
   }
 
-  public int timeout() {
-    return timeout;
+  public Date expiryTime() {
+    return expiryTime;
   }
 
   @Override
@@ -187,7 +202,7 @@ public class TxEvent {
         ", parentTxId='" + parentTxId + '\'' +
         ", type='" + type + '\'' +
         ", compensationMethod='" + compensationMethod + '\'' +
-        ", timeout='" + timeout + '\'' +
+        ", expiryTime='" + expiryTime + '\'' +
         '}';
   }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b974bd9..0af6fb5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -25,6 +25,10 @@ public interface TxEventRepository {
 
   Optional<TxEvent> findFirstAbortedGlobalTransaction();
 
+  List<TxEvent> findTimeoutEvents();
+
+  Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId);
+
   List<TxEvent> findTransactions(String globalTxId, String type);
 
   List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
index dc365e3..00ca2ec 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
@@ -31,9 +31,14 @@ public class TxTimeout {
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long surrogateId;
 
+  private long eventId;
+  private String serviceName;
+  private String instanceId;
   private String globalTxId;
   private String localTxId;
-  private Date expireTime;
+  private String parentTxId;
+  private String type;
+  private Date expiryTime;
   private String status;
 
   @Version
@@ -42,13 +47,27 @@ public class TxTimeout {
   TxTimeout() {
   }
 
-  public TxTimeout(String globalTxId, String localTxId, Date expireTime, String status) {
+  TxTimeout(long eventId, String serviceName, String instanceId, String globalTxId, String localTxId,
+      String parentTxId, String type, Date expiryTime, String status) {
+    this.eventId = eventId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
-    this.expireTime = expireTime;
+    this.parentTxId = parentTxId;
+    this.type = type;
+    this.expiryTime = expiryTime;
     this.status = status;
   }
 
+  public String serviceName() {
+    return serviceName;
+  }
+
+  public String instanceId() {
+    return instanceId;
+  }
+
   public String globalTxId() {
     return globalTxId;
   }
@@ -57,24 +76,33 @@ public class TxTimeout {
     return localTxId;
   }
 
-  public Date expireTime() {
-    return expireTime;
+  public String parentTxId() {
+    return parentTxId;
   }
 
-  public String status() {
-    return status;
+  public String type() {
+    return type;
   }
 
-  public void setStatus(String status) {
-    this.status = status;
+  public Date expiryTime() {
+    return expiryTime;
+  }
+
+  public String status() {
+    return status;
   }
 
   @Override
   public String toString() {
     return "TxTimeout{" +
-        "globalTxId='" + globalTxId + '\'' +
+        "eventId=" + eventId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", globalTxId='" + globalTxId + '\'' +
         ", localTxId='" + localTxId + '\'' +
-        ", expireTime=" + expireTime +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", expiryTime=" + expiryTime +
         ", status=" + status +
         '}';
   }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
index 88758c7..97387a3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
@@ -20,9 +20,9 @@ package org.apache.servicecomb.saga.alpha.core;
 import java.util.List;
 
 public interface TxTimeoutRepository {
-  void save(TxTimeout event);
+  void save(TxTimeout timeout);
 
-  void markTxTimeoutAsDone(String globalTxId, String localTxId);
+  void markTimeoutAsDone();
 
-  List<TxEvent> findFirstTimeoutTxToAbort();
+  List<TxTimeout> findFirstTimeout();
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 5467368..d220994 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,15 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.Collections.emptyList;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -57,6 +54,18 @@ public class TxConsistentServiceTest {
     }
 
     @Override
+    public List<TxEvent> findTimeoutEvents() {
+      return emptyList();
+    }
+
+    @Override
+    public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()))
+          .findFirst();
+    }
+
+    @Override
     public List<TxEvent> findTransactions(String globalTxId, String type) {
       return events.stream()
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
@@ -78,29 +87,6 @@ public class TxConsistentServiceTest {
     }
   };
 
-  private final Deque<TxTimeout> timeouts = new ConcurrentLinkedDeque<>();
-  private final TxTimeoutRepository timeoutRepository = new TxTimeoutRepository() {
-    @Override
-    public void save(TxTimeout timeout) {
-      timeouts.add(timeout);
-    }
-
-    @Override
-    public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
-      for (TxTimeout timeout : timeouts) {
-        if (timeout.globalTxId().equals(globalTxId) && timeout.localTxId().equals(localTxId)) {
-          timeout.setStatus(DONE.name());
-          break;
-        }
-      }
-    }
-
-    @Override
-    public List<TxEvent> findFirstTimeoutTxToAbort() {
-      return null;
-    }
-  };
-
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
@@ -109,13 +95,12 @@ public class TxConsistentServiceTest {
 
   private final String compensationMethod = getClass().getCanonicalName();
 
-  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository);
   private final byte[] payloads = "yeah".getBytes();
 
   @Before
   public void setUp() throws Exception {
     events.clear();
-    timeouts.clear();
   }
 
   @Test
@@ -132,7 +117,6 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
-    assertThat(timeouts.isEmpty(), is(true));
   }
 
   @Test
@@ -146,46 +130,11 @@ public class TxConsistentServiceTest {
     consistentService.handle(event);
 
     assertThat(events.size(), is(2));
-    assertThat(timeouts.isEmpty(), is(true));
-  }
-
-  @Test
-  public void persistTimeoutEventOnArrival() {
-    TxEvent[] events = {
-        newEventWithTimeout(SagaStartedEvent, globalTxId,2),
-        newEventWithTimeout(TxStartedEvent, 1),
-        newEvent(TxEndedEvent),
-        newEvent(TxCompensatedEvent),
-        eventOf(SagaEndedEvent, globalTxId)};
-
-    for (TxEvent event : events) {
-      consistentService.handle(event);
-    }
-
-    assertThat(this.events, contains(events));
-    assertThat(timeouts.size(), is(2));
-    await().atMost(1, SECONDS).until(this::allTimeoutIsDone);
-  }
-
-  private boolean allTimeoutIsDone() {
-    for (TxTimeout timeout : timeouts) {
-      if (!timeout.status().equals(DONE.name())) {
-        return false;
-      }
-    }
-    return true;
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return newEventWithTimeout(eventType, 0);
-  }
-
-  private TxEvent newEventWithTimeout(EventType eventType, int timeout) {
-    return newEventWithTimeout(eventType, localTxId, timeout);
-  }
-
-  private TxEvent newEventWithTimeout(EventType eventType, String localTxId, int timeout) {
-    return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, timeout, payloads);
+    return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod,
+        payloads);
   }
 
   private TxEvent eventOf(EventType eventType, String localTxId) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 9472d0d..6889c9f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -94,7 +94,7 @@ class AlphaConfig {
         eventRepository, commandRepository, timeoutRepository,
         omegaCallback, eventPollingInterval).run();
 
-    TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository);
+    TxConsistentService consistentService = new TxConsistentService(eventRepository);
 
     ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 5531d8f..d6ea21c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -43,6 +43,16 @@ class SpringTxEventRepository implements TxEventRepository {
   }
 
   @Override
+  public List<TxEvent> findTimeoutEvents() {
+    return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST);
+  }
+
+  @Override
+  public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) {
+    return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, localTxId);
+  }
+
+  @Override
   public List<TxEvent> findTransactions(String globalTxId, String type) {
     return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 71c808d..ee75496 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -17,15 +17,12 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
 import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import javax.transaction.Transactional;
 
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxTimeout;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.springframework.data.domain.PageRequest;
@@ -38,26 +35,21 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository {
   }
 
   @Override
-  public void save(TxTimeout event) {
-    timeoutRepo.save(event);
+  public void save(TxTimeout timeout) {
+    timeoutRepo.save(timeout);
   }
 
   @Override
-  public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
-    timeoutRepo.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId);
+  public void markTimeoutAsDone() {
+    timeoutRepo.updateStatusOfFinishedTx();
   }
 
   @Transactional
   @Override
-  public List<TxEvent> findFirstTimeoutTxToAbort() {
-    List<TxEvent> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
-    List<TxEvent> pendingTimeoutEvents = new ArrayList<>();
-    timeoutEvents.forEach(event -> {
-      if (timeoutRepo.updateStatusFromNewByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())
-          != 0) {
-        pendingTimeoutEvents.add(event);
-      }
-    });
-    return pendingTimeoutEvents;
+  public List<TxTimeout> findFirstTimeout() {
+    List<TxTimeout> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
+    timeoutEvents.forEach(event -> timeoutRepo
+        .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId()));
+    return timeoutEvents;
   }
 }
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index c4984f9..0eaf089 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -38,6 +38,22 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
       + "    AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
   Optional<TxEvent> findFirstAbortedGlobalTxByType();
 
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
+      + "  AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS( "
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type"
+      + ")")
+  List<TxEvent> findTimeoutEvents(Pageable pageable);
+
+  @Query("SELECT t FROM TxEvent t "
+      + "WHERE t.globalTxId = ?1 "
+      + "  AND t.localTxId = ?2 "
+      + "  AND t.type = 'TxStartedEvent'")
+  Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
+
   @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
       + "t.type, t.compensationMethod, t.payloads"
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
index cc39397..f0e264a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
@@ -22,7 +22,6 @@ import java.util.List;
 import javax.persistence.LockModeType;
 import javax.transaction.Transactional;
 
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxTimeout;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Lock;
@@ -44,25 +43,22 @@ interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> {
       @Param("globalTxId") String globalTxId,
       @Param("localTxId") String localTxId);
 
+  @Lock(LockModeType.OPTIMISTIC)
+  @Query("SELECT t FROM TxTimeout AS t "
+      + "WHERE t.status = 'NEW' "
+      + "  AND t.expiryTime < CURRENT_TIMESTAMP "
+      + "ORDER BY t.expiryTime ASC")
+  List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+
   @Transactional
   @Modifying(clearAutomatically = true)
-  @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t "
-      + "SET t.status = :status "
-      + "WHERE t.globalTxId = :globalTxId "
-      + "  AND t.localTxId = :localTxId "
-      + "  AND t.status = 'NEW'")
-  int updateStatusFromNewByGlobalTxIdAndLocalTxId(
-      @Param("status") String status,
-      @Param("globalTxId") String globalTxId,
-      @Param("localTxId") String localTxId);
-
-  @Lock(LockModeType.OPTIMISTIC)
-  @Query("SELECT te FROM TxEvent AS te "
-      + "INNER JOIN TxTimeout AS tt "
-      + "ON te.globalTxId = tt.globalTxId "
-      + "  AND te.localTxId = tt.localTxId "
-      + "  AND tt.status = 'NEW' "
-      + "  AND tt.expireTime < CURRENT_TIMESTAMP "
-      + "ORDER BY tt.expireTime ASC")
-  List<TxEvent> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+  @Query("UPDATE TxTimeout t "
+      + "SET t.status = 'DONE' "
+      + "WHERE t.status != 'DONE' AND EXISTS ("
+      + "  SELECT t1.globalTxId FROM TxEvent t1 "
+      + "  WHERE t1.globalTxId = t.globalTxId "
+      + "    AND t1.localTxId = t.localTxId "
+      + "    AND t1.type != t.type"
+      + ")")
+  void updateStatusOfFinishedTx();
 }
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 484c5e3..e7f774b 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
-  payloads bytea,
-  version bigint NOT NULL
+  expiryTime timestamp(6) NOT NULL,
+  payloads bytea
 );
 
 CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type);
@@ -35,11 +35,16 @@ CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId,
 
 CREATE TABLE IF NOT EXISTS TxTimeout (
   surrogateId BIGSERIAL PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(16) NOT NULL,
+  instanceId varchar(36) NOT NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
-  expireTime TIMESTAMP NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  expiryTime TIMESTAMP NOT NULL,
   status varchar(12),
   version bigint NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expireTime, globalTxId, localTxId, status);
+CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expiryTime, globalTxId, localTxId, status);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 225c194..497c244 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -383,10 +382,6 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
 
-    await().atMost(1, SECONDS).until(() -> timeoutEntityRepository.count() == 1L);
-    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
-    timeouts.forEach(timeout -> assertThat(timeout.status(), is(NEW.name())));
-
     await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
@@ -395,7 +390,7 @@ public class AlphaIntegrationTest {
     assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
 
     assertThat(timeoutEntityRepository.count(), is(1L));
-    timeouts = timeoutEntityRepository.findAll();
+    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
     timeouts.forEach(timeout -> {
       assertThat(timeout.status(), is(DONE.name()));
       assertThat(timeout.globalTxId(), is(globalTxId));
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 0958389..929c69f 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
   parentTxId varchar(36) DEFAULT NULL,
   type varchar(50) NOT NULL,
   compensationMethod varchar(256) NOT NULL,
-  payloads varbinary(10240),
-  version bigint NOT NULL
+  expiryTime TIMESTAMP NOT NULL,
+  payloads varbinary(10240)
 );
 
 CREATE TABLE IF NOT EXISTS Command (
@@ -29,9 +29,14 @@ CREATE TABLE IF NOT EXISTS Command (
 
 CREATE TABLE IF NOT EXISTS TxTimeout (
   surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+  eventId bigint NOT NULL UNIQUE,
+  serviceName varchar(36) NOT NULL,
+  instanceId varchar(36) NOT NULL,
   globalTxId varchar(36) NOT NULL,
   localTxId varchar(36) NOT NULL,
-  expireTime TIMESTAMP NOT NULL,
+  parentTxId varchar(36) DEFAULT NULL,
+  type varchar(50) NOT NULL,
+  expiryTime TIMESTAMP NOT NULL,
   status varchar(12),
   version bigint NOT NULL
 );
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
deleted file mode 100644
index 3015a01..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-class OnceAwareInterceptor implements EventAwareInterceptor {
-  private final EventAwareInterceptor interceptor;
-  private final AtomicReference<EventAwareInterceptor> interceptorRef;
-
-  OnceAwareInterceptor(EventAwareInterceptor interceptor) {
-    this.interceptor = interceptor;
-    this.interceptorRef = new AtomicReference<>(interceptor);
-  }
-
-  @Override
-  public AlphaResponse preIntercept(String parentTxId, String signature, int timeout, Object... args) {
-    return interceptor.preIntercept(parentTxId, signature, timeout, args);
-  }
-
-  @Override
-  public void postIntercept(String parentTxId, String signature) {
-    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.postIntercept(parentTxId, signature);
-    }
-  }
-
-  @Override
-  public void onError(String parentTxId, String signature, Throwable throwable) {
-    if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
-      interceptor.onError(parentTxId, signature, throwable);
-    }
-  }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 090fe2e..932b990 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -54,7 +54,6 @@ public class TransactionAspect {
     String localTxId = context.localTxId();
     context.newLocalTxId();
 
-    OnceAwareInterceptor interceptor = new OnceAwareInterceptor(this.interceptor);
     AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
deleted file mode 100644
index 90a133b..0000000
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-public class OnceAwareInterceptorTest {
-  private static final int runningCounts = 1000;
-
-  private final String localTxId = uniquify("localTxId");
-  private final String signature = uniquify("signature");
-
-  private final AtomicInteger postInterceptInvoked = new AtomicInteger();
-  private final AtomicInteger onErrorInvoked = new AtomicInteger();
-
-  private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
-    @Override
-    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
-      return new AlphaResponse(false);
-    }
-
-    @Override
-    public void postIntercept(String parentTxId, String compensationMethod) {
-      postInterceptInvoked.incrementAndGet();
-    }
-
-    @Override
-    public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
-      onErrorInvoked.incrementAndGet();
-    }
-  };
-
-  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
-
-  @Test
-  public void invokePostIntercept() throws Exception {
-    List<Future<?>> futures = new LinkedList<>();
-
-    for (int i = 0; i < runningCounts; i++) {
-      OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
-
-      futures.add(executorService.submit(() -> interceptor.postIntercept(localTxId, signature)));
-    }
-
-    waitTillAllDone(futures);
-
-    assertThat(postInterceptInvoked.get(), is(runningCounts));
-  }
-
-  @Test
-  public void invokeOnErrorConcurrently() throws Exception {
-    RuntimeException oops = new RuntimeException("oops");
-    List<Future<?>> futures = new LinkedList<>();
-
-    for (int i = 0; i < runningCounts; i++) {
-      OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
-
-      futures.add(executorService.submit(() -> interceptor.onError(localTxId, signature, oops)));
-    }
-
-    waitTillAllDone(futures);
-
-    assertThat(onErrorInvoked.get(), is(runningCounts));
-  }
-
-  private void waitTillAllDone(List<Future<?>> futures)
-      throws InterruptedException, java.util.concurrent.ExecutionException {
-    for (Future<?> future : futures) {
-      future.get();
-    }
-  }
-}

-- 
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.