You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 01:04:43 UTC

[incubator-servicecomb-saga] branch SCB-220_avoid_redundant_compensation updated (6fdbdc2 -> d6a8f1b)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    omit 6fdbdc2  SCB-220 aborted tx was not supposed to be compensated
    omit 56c54c6  SCB-220 compensated only ended events
     add 4377256  SCB-227 stop sub tx from running when global tx failed
     add e932061  SCB-227 proper namings for aborted status
     add 555e018  SCB-227 add OmegaTxAbortedException
     add f9acce0  SCB-227 throw OmegaException when sending get interrupted
     add 0dd9330  SCB-227 use InvalidTransactionException instead of custom exception
     add 9643ad8  SCB-227 minor fix
     new 61b0dc1  SCB-220 compensated only ended events
     new 3662c5a  SCB-220 aborted tx was not supposed to be compensated
     new d6a8f1b  SCB-220 fixed rebase conflict

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6fdbdc2)
            \
             N -- N -- N   refs/heads/SCB-220_avoid_redundant_compensation (d6a8f1b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../saga/alpha/core/TxConsistentService.java       |  7 +++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 13 ++++++++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 10 +++---
 .../saga/alpha/server/AlphaIntegrationTest.java    | 20 ++++++++++++
 .../connector/grpc/GrpcClientMessageSender.java    |  7 ++--
 .../grpc/LoadBalancedClusterMessageSender.java     | 23 ++++++++------
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 33 ++++++++++++++++++-
 .../spring/TransactionInterceptionTest.java        |  6 +++-
 .../{OmegaException.java => AlphaResponse.java}    | 11 +++++--
 .../omega/transaction/CompensableInterceptor.java  |  5 +--
 .../omega/transaction/EventAwareInterceptor.java   |  5 +--
 .../saga/omega/transaction/MessageSender.java      |  5 ++-
 .../saga/omega/transaction/OmegaException.java     |  3 ++
 .../transaction/SagaStartAnnotationProcessor.java  |  4 +--
 .../omega/transaction/TimeAwareInterceptor.java    |  4 +--
 .../saga/omega/transaction/TransactionAspect.java  | 10 +++++-
 .../transaction/CompensableInterceptorTest.java    |  5 ++-
 .../CompensationMessageHandlerTest.java            |  5 ++-
 .../SagaStartAnnotationProcessorTest.java          |  5 ++-
 .../omega/transaction/SagaStartAspectTest.java     |  5 ++-
 .../transaction/TimeAwareInterceptorTest.java      |  3 +-
 .../omega/transaction/TransactionAspectTest.java   | 37 ++++++++++++++++++----
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 23 files changed, 183 insertions(+), 44 deletions(-)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{OmegaException.java => AlphaResponse.java} (82%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 03/03: SCB-220 fixed rebase conflict

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit d6a8f1b422dc0f59bbcdd46dae1a7bc1d5a1ffc8
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 09:04:27 2018 +0800

    SCB-220 fixed rebase conflict
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../org/apache/servicecomb/saga/alpha/core/TxConsistentService.java    | 3 +--
 .../org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java | 1 +
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 9c592d8..5dc5788 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -63,6 +63,7 @@ public class TxConsistentService {
     eventRepository.save(event);
 
     executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+    return true;
   }
 
   private void compensateIfAlreadyAborted(TxEvent event) {
@@ -77,8 +78,6 @@ public class TxConsistentService {
 
   private boolean isCompensationScheduled(TxEvent event) {
     return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
-
-    return true;
   }
 
   private void compensate(TxEvent event) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index ab9c7ec..0048311 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -277,6 +277,7 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
 
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/03: SCB-220 aborted tx was not supposed to be compensated

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 3662c5a62bc80184e1ae09c81ca06917def56784
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 13 15:33:21 2018 +0800

    SCB-220 aborted tx was not supposed to be compensated
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/integration/pack/tests/PackIT.java | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 7ed1b88..0a340db 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -136,14 +136,14 @@ public class PackIT {
 
     assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
 
-    await().atMost(2, SECONDS).until(() -> repository.count() == 8);
+    await().atMost(2, SECONDS).until(() -> repository.count() == 7);
 
     List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
     assertThat(distinctGlobalTxIds.size(), is(1));
 
     String globalTxId = distinctGlobalTxIds.get(0);
     List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
-    assertThat(envelopes.size(), is(8));
+    assertThat(envelopes.size(), is(7));
 
     TxEventEnvelope sagaStartedEvent = envelopes.get(0);
     assertThat(sagaStartedEvent.type(), is("SagaStartedEvent"));
@@ -170,18 +170,9 @@ public class PackIT {
     assertThat(txCompensatedEvent1.serviceName(), is(serviceName));
     assertThat(txCompensatedEvent1.instanceId(), is(txStartedEvent1.instanceId()));
 
-    TxEventEnvelope txCompensatedEvent2 = envelopes.get(6);
-    assertThat(txCompensatedEvent2.type(), is("TxCompensatedEvent"));
-    assertThat(txCompensatedEvent2.localTxId(), is(txStartedEvent2.localTxId()));
-    assertThat(txCompensatedEvent2.parentTxId(), is(globalTxId));
-    assertThat(txCompensatedEvent2.serviceName(), is(serviceName));
-    assertThat(txCompensatedEvent2.instanceId(), is(txStartedEvent2.instanceId()));
+    assertThat(envelopes.get(6).type(), is("SagaEndedEvent"));
 
-    assertThat(envelopes.get(7).type(), is("SagaEndedEvent"));
-
-    assertThat(compensatedMessages, contains(
-        "Goodbye, " + TRESPASSER,
-        "My bad, please take the window instead, " + TRESPASSER));
+    assertThat(compensatedMessages, contains("Goodbye, " + TRESPASSER));
   }
 
   @Test(timeout = 5000)

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/03: SCB-220 compensated only ended events

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-220_avoid_redundant_compensation
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 61b0dc1460006d6a0b7ef437cffdaf50d109e6ac
Author: seanyinx <se...@huawei.com>
AuthorDate: Sat Jan 13 09:47:39 2018 +0800

    SCB-220 compensated only ended events
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 46 +++++++++-------
 .../servicecomb/saga/alpha/core/TxEvent.java       | 12 +++++
 .../saga/alpha/core/TxEventRepository.java         |  4 ++
 .../saga/alpha/core/TxConsistentServiceTest.java   | 39 ++++++++++++--
 .../saga/alpha/server/SpringTxEventRepository.java | 16 ++++--
 .../alpha/server/TxEventEnvelopeRepository.java    | 24 +++++++--
 .../saga/alpha/server/AlphaIntegrationTest.java    | 62 ++++++++++++++++++----
 7 files changed, 164 insertions(+), 39 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index fa93752..9c592d8 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.core;
 
+import static java.util.Collections.emptySet;
 import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -25,11 +26,10 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -42,11 +42,12 @@ public class TxConsistentService {
   private final TxEventRepository eventRepository;
   private final OmegaCallback omegaCallback;
   private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+    put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
     put(TxAbortedEvent.name(), (event) -> compensate(event));
     put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
   }};
 
-  private final Map<String, Set<String>> eventsToCompensate = new ConcurrentHashMap<>();
+  private final Map<String, Set<String>> eventsToCompensate = new HashMap<>();
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
   public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
@@ -61,27 +62,38 @@ public class TxConsistentService {
 
     eventRepository.save(event);
 
-    executor.execute(() -> {
-      if (isTxEndedEvent(event) && isGlobalTxAborted(event)) {
-        omegaCallback.compensate(event);
-      }
+    executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+  }
+
+  private void compensateIfAlreadyAborted(TxEvent event) {
+    if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
+      eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>()).add(event.localTxId());
+      TxEvent correspondingStartedEvent = eventRepository
+          .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
+
+      omegaCallback.compensate(correspondingStartedEvent);
+    }
+  }
 
-      eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
-    });
+  private boolean isCompensationScheduled(TxEvent event) {
+    return eventsToCompensate.getOrDefault(event.globalTxId(), emptySet()).contains(event.localTxId());
 
     return true;
   }
 
   private void compensate(TxEvent event) {
-    List<TxEvent> events = eventRepository.findTransactions(event.globalTxId(), TxStartedEvent.name());
-    eventsToCompensate.computeIfAbsent(event.globalTxId(), (v) -> {
-      Set<String> eventSet = new ConcurrentSkipListSet<>();
-      events.forEach(e -> eventSet.add(e.localTxId()));
-      return eventSet;
-    });
+    List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
+
+    events.removeIf(this::isCompensationScheduled);
+
+    Set<String> localTxIds = eventsToCompensate.computeIfAbsent(event.globalTxId(), k -> new HashSet<>());
+    events.forEach(e -> localTxIds.add(e.localTxId()));
+
     events.forEach(omegaCallback::compensate);
   }
 
+  // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
+  // unless we ask user to specify a name for each participant in the global TX in @Compensable
   private void updateCompensateStatus(TxEvent event) {
     Set<String> events = eventsToCompensate.get(event.globalTxId());
     if (events != null) {
@@ -102,8 +114,4 @@ public class TxConsistentService {
   private boolean isGlobalTxAborted(TxEvent event) {
     return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
   }
-
-  private boolean isTxEndedEvent(TxEvent event) {
-    return TxEndedEvent.name().equals(event.type());
-  }
 }
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 9a2cea4..ebea44c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -36,6 +36,18 @@ public class TxEvent {
   public TxEvent(
       String serviceName,
       String instanceId,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+  }
+
+  public TxEvent(
+      String serviceName,
+      String instanceId,
       Date creationTime,
       String globalTxId,
       String localTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 3a8387b..cf5706b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,4 +23,8 @@ public interface TxEventRepository {
   void save(TxEvent event);
 
   List<TxEvent> findTransactions(String globalTxId, String type);
+
+  TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+
+  List<TxEvent> findTransactionsToCompensate(String globalTxId);
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 9318a06..99667e7 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -56,6 +56,40 @@ public class TxConsistentServiceTest {
           .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
           .collect(Collectors.toList());
     }
+
+    @Override
+    public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
+          .findFirst()
+          .get();
+    }
+
+    @Override
+    public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId())
+              && event.type().equals(TxStartedEvent.name())
+              && isCompleted(globalTxId, event)
+              && !isCompensated(globalTxId, event))
+          .collect(Collectors.toList());
+    }
+
+    private boolean isCompleted(String globalTxId, TxEvent event) {
+      return events.stream()
+          .filter(e -> globalTxId.equals(e.globalTxId())
+              && e.localTxId().equals(event.localTxId())
+              && e.type().equals(TxEndedEvent.name()))
+          .count() > 0;
+    }
+
+    private boolean isCompensated(String globalTxId, TxEvent event) {
+      return events.stream()
+          .filter(e -> globalTxId.equals(e.globalTxId())
+              && e.localTxId().equals(event.localTxId())
+              && e.type().equals(TxCompensatedEvent.name()))
+          .count() > 0;
+    }
   };
 
   private final String globalTxId = UUID.randomUUID().toString();
@@ -121,17 +155,16 @@ public class TxConsistentServiceTest {
 
   @Test
   public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
-    String localTxId1 = UUID.randomUUID().toString();
     events.add(newEvent(TxStartedEvent));
     events.add(newEvent(TxAbortedEvent));
 
-    TxEvent event = eventOf(TxEndedEvent, "service x".getBytes(), localTxId1, "method x");
+    TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
 
     consistentService.handle(event);
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
     assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, localTxId1, "method x", "service x".getBytes())
+        new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
     ));
   }
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 3bf6e03..00d10d2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,7 +18,6 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
@@ -37,9 +36,16 @@ class SpringTxEventRepository implements TxEventRepository {
 
   @Override
   public List<TxEvent> findTransactions(String globalTxId, String type) {
-    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
-        .stream()
-        .map(TxEventEnvelope::event)
-        .collect(Collectors.toList());
+    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
+  }
+
+  @Override
+  public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
+    return eventRepo.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(globalTxId, localTxId, type).event();
+  }
+
+  @Override
+  public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
+    return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fe05c1e..1e1859c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -19,15 +19,33 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import java.util.List;
 
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
-  TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+  List<TxEventEnvelope> findByEventGlobalTxId(String globalTxId);
 
-  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.server.TxEventEnvelope("
+  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
       + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
-  List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+  List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
+
+  TxEventEnvelope findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(String globalTxId, String localTxId, String type);
+
+  @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+      + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+      + ") FROM TxEventEnvelope t "
+      + "WHERE t.event.globalTxId = ?1 AND t.event.type = 'TxStartedEvent' AND EXISTS ( "
+      + "  FROM TxEventEnvelope t1 "
+      + "  WHERE t1.event.globalTxId = ?1 "
+      + "  AND t1.event.localTxId = t.event.localTxId "
+      + "  AND t1.event.type = 'TxEndedEvent'"
+      + ") AND NOT EXISTS ( "
+      + "  FROM TxEventEnvelope t2 "
+      + "  WHERE t2.event.globalTxId = ?1 "
+      + "  AND t2.event.localTxId = t.event.localTxId "
+      + "  AND t2.event.type = 'TxCompensatedEvent')")
+  List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 6cf1893..ab9c7ec 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
@@ -119,11 +120,11 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
-    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
 
     assertThat(receivedCommands.isEmpty(), is(true));
 
-    TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
+    TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId).get(0);
 
     assertThat(envelope.serviceName(), is(serviceName));
     assertThat(envelope.instanceId(), is(instanceId));
@@ -179,6 +180,7 @@ public class AlphaIntegrationTest {
   public void removeCallbackOnClientDown() throws Exception {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     omegaCallbacks.get(serviceName).get(instanceId).disconnect();
 
@@ -188,17 +190,34 @@ public class AlphaIntegrationTest {
   }
 
   @Test
+  public void compensateImmediatelyWhenGlobalTxAlreadyAborted() throws Exception {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], compensationMethod));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    GrpcCompensateCommand command = receivedCommands.poll();
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getLocalTxId(), is(localTxId));
+    assertThat(command.getParentTxId(), is(parentTxId));
+    assertThat(command.getCompensateMethod(), is(compensationMethod));
+    assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
+  }
+
+  @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method a"));
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId, parentTxId, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
     blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
-    blockingStub.onTxEvent(eventOf(TxEndedEvent, new byte[0], "method b"));
+    blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
@@ -215,7 +234,8 @@ public class AlphaIntegrationTest {
   public void getCompensateCommandOnFailure() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
-    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+    await().atMost(1, SECONDS).until(() -> !eventRepo.findByEventGlobalTxId(globalTxId).isEmpty());
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -232,6 +252,7 @@ public class AlphaIntegrationTest {
   public void compensateOnlyFailedGlobalTransaction() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
 
     // simulates connection from another service with different globalTxId
     GrpcServiceConfig anotherServiceConfig = someServiceConfig();
@@ -240,7 +261,7 @@ public class AlphaIntegrationTest {
     TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
     anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
 
-    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -269,6 +290,29 @@ public class AlphaIntegrationTest {
     assertThat(result.getAborted(), is(true));
   }
 
+  @Test
+  public void compensateOnlyCompletedTransactions() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+    String anotherLocalTxId1 = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId1));
+    blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId1));
+    blockingStub.onTxEvent(someGrpcEvent(TxCompensatedEvent, globalTxId, anotherLocalTxId1));
+
+    String anotherLocalTxId2 = UUID.randomUUID().toString();
+    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
+
+    await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    assertThat(receivedCommands.size(), is(1));
+    assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
@@ -294,11 +338,11 @@ public class AlphaIntegrationTest {
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
-    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+    return someGrpcEvent(type, globalTxId, localTxId);
   }
 
-  private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
-    return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+  private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
+    return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
   private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.