You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 00:55:57 UTC

[incubator-servicecomb-saga] branch master updated (5f1dce3 -> 9643ad8)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from 5f1dce3  SCB-232 update to introduce the EventType in the pack-common.It should be shared between the omega and alpha.
     new 4377256  SCB-227 stop sub tx from running when global tx failed
     new e932061  SCB-227 proper namings for aborted status
     new 555e018  SCB-227 add OmegaTxAbortedException
     new f9acce0  SCB-227 throw OmegaException when sending get interrupted
     new 0dd9330  SCB-227 use InvalidTransactionException instead of custom exception
     new 9643ad8  SCB-227 minor fix

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../saga/alpha/core/TxConsistentService.java       |  8 ++++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 13 ++++++++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 10 +++---
 .../saga/alpha/server/AlphaIntegrationTest.java    | 19 +++++++++++
 .../connector/grpc/GrpcClientMessageSender.java    |  7 ++--
 .../grpc/LoadBalancedClusterMessageSender.java     | 23 ++++++++------
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 33 ++++++++++++++++++-
 .../spring/TransactionInterceptionTest.java        |  6 +++-
 .../{OmegaException.java => AlphaResponse.java}    | 11 +++++--
 .../omega/transaction/CompensableInterceptor.java  |  5 +--
 .../omega/transaction/EventAwareInterceptor.java   |  5 +--
 .../saga/omega/transaction/MessageSender.java      |  5 ++-
 .../saga/omega/transaction/OmegaException.java     |  3 ++
 .../transaction/SagaStartAnnotationProcessor.java  |  4 +--
 .../omega/transaction/TimeAwareInterceptor.java    |  4 +--
 .../saga/omega/transaction/TransactionAspect.java  | 10 +++++-
 .../transaction/CompensableInterceptorTest.java    |  5 ++-
 .../CompensationMessageHandlerTest.java            |  5 ++-
 .../SagaStartAnnotationProcessorTest.java          |  5 ++-
 .../omega/transaction/SagaStartAspectTest.java     |  5 ++-
 .../transaction/TimeAwareInterceptorTest.java      |  3 +-
 .../omega/transaction/TransactionAspectTest.java   | 37 ++++++++++++++++++----
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 23 files changed, 183 insertions(+), 44 deletions(-)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{OmegaException.java => AlphaResponse.java} (82%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 01/06: SCB-227 stop sub tx from running when global tx failed

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 4377256c228e836eed5d4e2e023e43e1920c4aa0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Jan 12 19:06:36 2018 +0800

    SCB-227 stop sub tx from running when global tx failed
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 12 +++++++++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 13 ++++++++++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 15 ++++++++----
 .../saga/alpha/server/AlphaIntegrationTest.java    | 19 +++++++++++++++
 .../connector/grpc/GrpcClientMessageSender.java    |  6 +++--
 .../grpc/LoadBalancedClusterMessageSender.java     |  7 ++++--
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 16 ++++++++++++-
 .../omega/transaction/CompensableInterceptor.java  |  5 ++--
 .../omega/transaction/EventAwareInterceptor.java   |  5 ++--
 .../saga/omega/transaction/MessageSender.java      |  5 ++--
 .../transaction/SagaStartAnnotationProcessor.java  |  4 ++--
 .../omega/transaction/TimeAwareInterceptor.java    |  4 ++--
 .../saga/omega/transaction/TransactionAspect.java  |  7 +++++-
 .../transaction/TimeAwareInterceptorTest.java      |  3 ++-
 .../omega/transaction/TransactionAspectTest.java   | 28 ++++++++++++++++++----
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 16 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index bddba36..c9544ca 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -54,7 +54,11 @@ public class TxConsistentService {
     this.omegaCallback = omegaCallback;
   }
 
-  public void handle(TxEvent event) {
+  public boolean handle(TxEvent event) {
+    if (isInvalidTxStarted(event)) {
+      return false;
+    }
+
     eventRepository.save(event);
 
     executor.execute(() -> {
@@ -64,6 +68,8 @@ public class TxConsistentService {
 
       eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
     });
+
+    return true;
   }
 
   private void compensate(TxEvent event) {
@@ -100,4 +106,8 @@ public class TxConsistentService {
   private boolean isTxEndedEvent(TxEvent event) {
     return TxEndedEvent.name().equals(event.type());
   }
+
+  private boolean isInvalidTxStarted(TxEvent event) {
+    return TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event);
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index a44f16d..9318a06 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -135,6 +135,19 @@ public class TxConsistentServiceTest {
     ));
   }
 
+  @Test
+  public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    assertThat(events.size(), is(2));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 4b3ea1b..6ca26ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -39,7 +39,9 @@ import io.grpc.stub.StreamObserver;
 
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
-  private static final GrpcAck ACK = GrpcAck.newBuilder().build();
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setValid(true).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setValid(false).build();
+
   private final TxConsistentService txConsistentService;
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
@@ -67,13 +69,13 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
       callback.disconnect();
     }
 
-    responseObserver.onNext(ACK);
+    responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
   }
 
   @Override
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
-    txConsistentService.handle(new TxEvent(
+    boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
         message.getInstanceId(),
         new Date(message.getTimestamp()),
@@ -85,7 +87,12 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getPayloads().toByteArray()
     ));
 
-    responseObserver.onNext(ACK);
+    if (ok) {
+      responseObserver.onNext(ALLOW);
+    } else {
+      responseObserver.onNext(REJECT);
+    }
+
     responseObserver.onCompleted();
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a501f75..eb857d1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -250,6 +251,24 @@ public class AlphaIntegrationTest {
     anotherBlockingStub.onDisconnected(anotherServiceConfig);
   }
 
+  @Test
+  public void doNotStartSubTxOnFailure() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+    await().atMost(1, SECONDS).until(() -> receivedCommands.size() == 1);
+
+    String localTxId1 = UUID.randomUUID().toString();
+    String parentTxId1 = UUID.randomUUID().toString();
+    GrpcAck result = blockingStub
+        .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+
+    assertThat(result.getValid(), is(false));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index f6b5d76..70870bc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -28,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
@@ -83,8 +84,9 @@ public class GrpcClientMessageSender implements MessageSender {
   }
 
   @Override
-  public void send(TxEvent event) {
-    blockingEventService.onTxEvent(convertEvent(event));
+  public boolean send(TxEvent event) {
+    GrpcAck grpcAck = blockingEventService.onTxEvent(convertEvent(event));
+    return grpcAck.getValid();
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 73940ed..1eb6f7b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -124,14 +124,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   }
 
   @Override
-  public void send(TxEvent event) {
+  public boolean send(TxEvent event) {
+    boolean result = false;
     boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
 
       try {
         long startTime = System.nanoTime();
-        messageSender.send(event);
+        result = messageSender.send(event);
         senders.put(messageSender, System.nanoTime() - startTime);
 
         success = true;
@@ -142,6 +143,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         senders.put(messageSender, Long.MAX_VALUE);
       }
     } while (!success && !Thread.currentThread().isInterrupted());
+
+    return result;
   }
 
   private MessageSender fastestSender() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1fdfd2e..6c9fdcd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -47,6 +47,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -293,6 +294,14 @@ public class LoadBalancedClusterMessageSenderTest {
     await().atMost(2, SECONDS).until(() -> eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1);
   }
 
+  @Test
+  public void forwardSendResult() {
+    assertThat(messageSender.send(event), is(true));
+
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
+    assertThat(messageSender.send(rejectEvent), is(false));
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
@@ -341,7 +350,12 @@ public class LoadBalancedClusterMessageSenderTest {
             .build());
       }
 
-      responseObserver.onNext(GrpcAck.newBuilder().build());
+      if ("TxStartedEvent".equals(request.getType()) && request.getCompensationMethod().equals("reject")) {
+        responseObserver.onNext(GrpcAck.newBuilder().setValid(false).build());
+      } else {
+        responseObserver.onNext(GrpcAck.newBuilder().setValid(true).build());
+      }
+
       responseObserver.onCompleted();
     }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 76193cd..e24b92c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,8 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
+  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    return sender
+        .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 291538f..2263b38 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return true;
     }
 
     @Override
@@ -32,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  void preIntercept(String parentTxId, String compensationMethod, Object... message);
+  boolean preIntercept(String parentTxId, String compensationMethod, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 2f05394..86cacc2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,8 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSender {
-  MessageSender NO_OP_SENDER = event -> {
-  };
+  MessageSender NO_OP_SENDER = event -> true;
 
   default void onConnected() {
   }
@@ -34,5 +33,5 @@ public interface MessageSender {
     return "UNKNOWN";
   }
 
-  void send(TxEvent event);
+  boolean send(TxEvent event);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 2dad5ae..b6ef898 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -30,8 +30,8 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 80ad03f..ee00565 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,8 +29,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String signature, Object... args) {
-    interceptor.preIntercept(parentTxId, signature, args);
+  public boolean preIntercept(String parentTxId, String signature, Object... args) {
+    return interceptor.preIntercept(parentTxId, signature, args);
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index d3e091c..e506899 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -57,7 +57,12 @@ public class TransactionAspect {
     context.newLocalTxId();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
-    interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    boolean ok = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    if (!ok) {
+      LOG.info("Skipped transaction {} due to abort.", context.globalTxId());
+      context.setLocalTxId(localTxId);
+      return null;
+    }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index d26f04f..9e4055f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -45,7 +45,8 @@ public class TimeAwareInterceptorTest {
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
     @Override
-    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return true;
     }
 
     @Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 63cdff0..8f7c73a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -23,6 +23,10 @@ import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -40,7 +44,6 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
@@ -50,12 +53,12 @@ public class TransactionAspectTest {
   private final String newLocalTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
-  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
   @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
-  private final Compensable compensable = Mockito.mock(Compensable.class);
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+  private final Compensable compensable = mock(Compensable.class);
 
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
@@ -160,6 +163,21 @@ public class TransactionAspectTest {
     assertThat(messages.size(), is(2));
   }
 
+  @Test
+  public void returnImmediatelyWhenReceivedRejectResponse() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(false);
+
+    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+    try {
+      aspect.advise(joinPoint, compensable);
+    } catch (Throwable throwable) {
+      fail("Unexpected exception: " + throwable.getMessage());
+    }
+
+    verify(sender, times(1)).send(any());
+  }
+
   private String doNothing() {
     return "doNothing";
   }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9522b6f..405edff 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,7 @@ message GrpcServiceConfig {
 }
 
 message GrpcAck {
+  bool valid = 1;
 }
 
 message GrpcTxEvent {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/06: SCB-227 throw OmegaException when sending get interrupted

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit f9acce026b7ec1c9b012f1d1c8ffdc3624ca6183
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 17:58:06 2018 +0800

    SCB-227 throw OmegaException when sending get interrupted
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java    |  7 +------
 .../grpc/LoadBalancedClusterMessageSender.java        | 19 +++++++++----------
 .../grpc/LoadBalancedClusterMessageSenderTest.java    | 15 +++++++++++++++
 .../saga/omega/transaction/OmegaException.java        |  3 +++
 .../saga/omega/transaction/TransactionAspectTest.java |  4 +---
 5 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 65d3e90..eced7f9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -87,12 +87,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getPayloads().toByteArray()
     ));
 
-    if (ok) {
-      responseObserver.onNext(ALLOW);
-    } else {
-      responseObserver.onNext(REJECT);
-    }
-
+    responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9c7ab6b..b5e461b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -39,6 +39,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,26 +127,24 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public AlphaResponse send(TxEvent event) {
-    AlphaResponse response = null;
-    boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
 
       try {
         long startTime = System.nanoTime();
-        response = messageSender.send(event);
+        AlphaResponse response = messageSender.send(event);
         senders.put(messageSender, System.nanoTime() - startTime);
 
-        success = true;
+        return response;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
 
         // very large latency on exception
         senders.put(messageSender, Long.MAX_VALUE);
       }
-    } while (!success && !Thread.currentThread().isInterrupted());
+    } while (!Thread.currentThread().isInterrupted());
 
-    return response;
+    throw new OmegaException("Failed to send event " + event + " due to interruption");
   }
 
   private MessageSender fastestSender() {
@@ -154,14 +153,14 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse((event -> {
+        .orElse(event -> {
           try {
             return availableMessageSenders.take().send(event);
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ignored) {
             Thread.currentThread().interrupt();
           }
-          return new AlphaResponse(true);
-        }));
+          throw new OmegaException("Failed to send event " + event + " due to interruption");
+        });
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index c9549cf..b62dbfc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -45,6 +45,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
@@ -302,6 +303,20 @@ public class LoadBalancedClusterMessageSenderTest {
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
+  @Test
+  public void blowsUpWhenServerIsInterrupted() {
+    Thread thread = new Thread(() -> {
+      try {
+        messageSender.send(event);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        assertThat(e.getMessage().endsWith("interruption"), is(true));
+      }
+    });
+    thread.start();
+    thread.interrupt();
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
index 89eb67f..4488e44 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
@@ -18,6 +18,9 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public class OmegaException extends RuntimeException {
+  public OmegaException(String message) {
+    super(message);
+  }
 
   public OmegaException(String cause, Throwable throwable) {
     super(cause, throwable);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index a1bab98..aa79da7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -167,7 +167,7 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() {
+  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
     MessageSender sender = mock(MessageSender.class);
     when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
@@ -177,8 +177,6 @@ public class TransactionAspectTest {
       expectFailing(OmegaTxAbortedException.class);
     } catch (OmegaTxAbortedException e) {
       assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
-    } catch (Throwable throwable) {
-      expectFailing(OmegaTxAbortedException.class);
     }
 
     verify(sender, times(1)).send(any());

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/06: SCB-227 add OmegaTxAbortedException

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 555e0188ade4605713e26698ceb328ebc1a9aab3
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 14:43:08 2018 +0800

    SCB-227 add OmegaTxAbortedException
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSender.java     |  5 +++--
 .../omega/transaction/OmegaTxAbortedException.java | 24 ++++++++++++++++++++++
 .../saga/omega/transaction/TransactionAspect.java  |  2 +-
 .../omega/transaction/TransactionAspectTest.java   |  6 +++---
 4 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 635ac5e..9c7ab6b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -126,7 +126,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public AlphaResponse send(TxEvent event) {
-    AlphaResponse response = new AlphaResponse(false);
+    AlphaResponse response = null;
     boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
@@ -156,10 +156,11 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         .map(Entry::getKey)
         .orElse((event -> {
           try {
-            availableMessageSenders.take().send(event);
+            return availableMessageSenders.take().send(event);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
+          return new AlphaResponse(true);
         }));
   }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java
new file mode 100644
index 0000000..d7417cc
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+public class OmegaTxAbortedException extends RuntimeException {
+  public OmegaTxAbortedException(String message) {
+    super(message);
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index c57ee0e..6166ca3 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -61,7 +61,7 @@ public class TransactionAspect {
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
-      throw new IllegalStateException("Abort local sub transaction " + abortedLocalTxId +
+      throw new OmegaTxAbortedException("Abort local sub transaction " + abortedLocalTxId +
           " due to global transaction " + context.globalTxId() + " has already aborted.");
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 0d8fcd2..a1bab98 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -174,11 +174,11 @@ public class TransactionAspectTest {
     TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
-      expectFailing(IllegalStateException.class);
-    } catch (IllegalStateException e) {
+      expectFailing(OmegaTxAbortedException.class);
+    } catch (OmegaTxAbortedException e) {
       assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
     } catch (Throwable throwable) {
-      expectFailing(IllegalStateException.class);
+      expectFailing(OmegaTxAbortedException.class);
     }
 
     verify(sender, times(1)).send(any());

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/06: SCB-227 use InvalidTransactionException instead of custom exception

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 0dd933099796e4b590c2a6e70b991cbec746234e
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 18:30:00 2018 +0800

    SCB-227 use InvalidTransactionException instead of custom exception
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../omega/transaction/OmegaTxAbortedException.java | 24 ----------------------
 .../saga/omega/transaction/TransactionAspect.java  |  4 +++-
 .../omega/transaction/TransactionAspectTest.java   |  6 ++++--
 3 files changed, 7 insertions(+), 27 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java
deleted file mode 100644
index d7417cc..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxAbortedException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-public class OmegaTxAbortedException extends RuntimeException {
-  public OmegaTxAbortedException(String message) {
-    super(message);
-  }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 6166ca3..73a0372 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import javax.transaction.InvalidTransactionException;
+
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.aspectj.lang.ProceedingJoinPoint;
@@ -61,7 +63,7 @@ public class TransactionAspect {
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
-      throw new OmegaTxAbortedException("Abort local sub transaction " + abortedLocalTxId +
+      throw new InvalidTransactionException("Abort local sub transaction " + abortedLocalTxId +
           " due to global transaction " + context.globalTxId() + " has already aborted.");
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index aa79da7..ec08134 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import javax.transaction.InvalidTransactionException;
+
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -174,8 +176,8 @@ public class TransactionAspectTest {
     TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
-      expectFailing(OmegaTxAbortedException.class);
-    } catch (OmegaTxAbortedException e) {
+      expectFailing(InvalidTransactionException.class);
+    } catch (InvalidTransactionException e) {
       assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/06: SCB-227 minor fix

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9643ad8779be73d345842a5f6b6d1f6953714625
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 22:37:18 2018 +0800

    SCB-227 minor fix
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../omega/connector/grpc/LoadBalancedClusterMessageSender.java     | 7 ++++---
 .../omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java | 4 +++-
 .../servicecomb/saga/omega/transaction/TransactionAspect.java      | 4 ++--
 .../servicecomb/saga/omega/transaction/TransactionAspectTest.java  | 3 ++-
 4 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index b5e461b..700864a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -136,6 +136,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         senders.put(messageSender, System.nanoTime() - startTime);
 
         return response;
+      } catch (OmegaException e) {
+        throw e;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
 
@@ -156,10 +158,9 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         .orElse(event -> {
           try {
             return availableMessageSenders.take().send(event);
-          } catch (InterruptedException ignored) {
-            Thread.currentThread().interrupt();
+          } catch (InterruptedException e) {
+            throw new OmegaException("Failed to send event " + event + " due to interruption", e);
           }
-          throw new OmegaException("Failed to send event " + event + " due to interruption");
         });
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index b62dbfc..93cb854 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -304,7 +304,7 @@ public class LoadBalancedClusterMessageSenderTest {
   }
 
   @Test
-  public void blowsUpWhenServerIsInterrupted() {
+  public void blowsUpWhenServerIsInterrupted() throws InterruptedException {
     Thread thread = new Thread(() -> {
       try {
         messageSender.send(event);
@@ -313,8 +313,10 @@ public class LoadBalancedClusterMessageSenderTest {
         assertThat(e.getMessage().endsWith("interruption"), is(true));
       }
     });
+
     thread.start();
     thread.interrupt();
+    thread.join();
   }
 
   private int killServerReceivedMessage() {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 73a0372..cead07a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -63,8 +63,8 @@ public class TransactionAspect {
     if (response.aborted()) {
       String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
-      throw new InvalidTransactionException("Abort local sub transaction " + abortedLocalTxId +
-          " due to global transaction " + context.globalTxId() + " has already aborted.");
+      throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
+          " because global transaction " + context.globalTxId() + " has already aborted.");
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index ec08134..a3db940 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -178,7 +178,8 @@ public class TransactionAspectTest {
       aspect.advise(joinPoint, compensable);
       expectFailing(InvalidTransactionException.class);
     } catch (InvalidTransactionException e) {
-      assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
+      System.out.println(e.getMessage());
+      assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
     }
 
     verify(sender, times(1)).send(any());

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/06: SCB-227 proper namings for aborted status

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e932061164ff07d39f613ceab5b7e3887638cc75
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 10:40:24 2018 +0800

    SCB-227 proper namings for aborted status
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java          |  6 +-----
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java    |  4 ++--
 .../saga/alpha/server/AlphaIntegrationTest.java       |  2 +-
 .../omega/connector/grpc/GrpcClientMessageSender.java |  5 +++--
 .../grpc/LoadBalancedClusterMessageSender.java        |  9 +++++----
 .../grpc/LoadBalancedClusterMessageSenderTest.java    |  8 ++++----
 .../spring/TransactionInterceptionTest.java           |  6 +++++-
 .../{MessageSender.java => AlphaResponse.java}        | 19 ++++++-------------
 .../omega/transaction/CompensableInterceptor.java     |  2 +-
 .../saga/omega/transaction/EventAwareInterceptor.java |  6 +++---
 .../saga/omega/transaction/MessageSender.java         |  4 ++--
 .../transaction/SagaStartAnnotationProcessor.java     |  2 +-
 .../saga/omega/transaction/TimeAwareInterceptor.java  |  2 +-
 .../saga/omega/transaction/TransactionAspect.java     |  9 +++++----
 .../omega/transaction/CompensableInterceptorTest.java |  5 ++++-
 .../transaction/CompensationMessageHandlerTest.java   |  5 ++++-
 .../transaction/SagaStartAnnotationProcessorTest.java |  5 ++++-
 .../saga/omega/transaction/SagaStartAspectTest.java   |  5 ++++-
 .../omega/transaction/TimeAwareInterceptorTest.java   |  4 ++--
 .../saga/omega/transaction/TransactionAspectTest.java | 12 +++++++++---
 .../src/main/proto/GrpcTxEvent.proto                  |  2 +-
 21 files changed, 68 insertions(+), 54 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index c9544ca..fa93752 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -55,7 +55,7 @@ public class TxConsistentService {
   }
 
   public boolean handle(TxEvent event) {
-    if (isInvalidTxStarted(event)) {
+    if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
       return false;
     }
 
@@ -106,8 +106,4 @@ public class TxConsistentService {
   private boolean isTxEndedEvent(TxEvent event) {
     return TxEndedEvent.name().equals(event.type());
   }
-
-  private boolean isInvalidTxStarted(TxEvent event) {
-    return TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event);
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 6ca26ee..65d3e90 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -39,8 +39,8 @@ import io.grpc.stub.StreamObserver;
 
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
-  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setValid(true).build();
-  private static final GrpcAck REJECT = GrpcAck.newBuilder().setValid(false).build();
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   private final TxConsistentService txConsistentService;
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index eb857d1..6cf1893 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -266,7 +266,7 @@ public class AlphaIntegrationTest {
     GrpcAck result = blockingStub
         .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
 
-    assertThat(result.getValid(), is(false));
+    assertThat(result.getAborted(), is(true));
   }
 
   private GrpcServiceConfig someServiceConfig() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 70870bc..4af0773 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -23,6 +23,7 @@ package org.apache.servicecomb.saga.omega.connector.grpc;
 import java.util.function.Function;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
@@ -84,9 +85,9 @@ public class GrpcClientMessageSender implements MessageSender {
   }
 
   @Override
-  public boolean send(TxEvent event) {
+  public AlphaResponse send(TxEvent event) {
     GrpcAck grpcAck = blockingEventService.onTxEvent(convertEvent(event));
-    return grpcAck.getValid();
+    return new AlphaResponse(grpcAck.getAborted());
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 1eb6f7b..635ac5e 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
@@ -124,15 +125,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   }
 
   @Override
-  public boolean send(TxEvent event) {
-    boolean result = false;
+  public AlphaResponse send(TxEvent event) {
+    AlphaResponse response = new AlphaResponse(false);
     boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
 
       try {
         long startTime = System.nanoTime();
-        result = messageSender.send(event);
+        response = messageSender.send(event);
         senders.put(messageSender, System.nanoTime() - startTime);
 
         success = true;
@@ -144,7 +145,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       }
     } while (!success && !Thread.currentThread().isInterrupted());
 
-    return result;
+    return response;
   }
 
   private MessageSender fastestSender() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 6c9fdcd..c9549cf 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -296,10 +296,10 @@ public class LoadBalancedClusterMessageSenderTest {
 
   @Test
   public void forwardSendResult() {
-    assertThat(messageSender.send(event), is(true));
+    assertThat(messageSender.send(event).aborted(), is(false));
 
     TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
-    assertThat(messageSender.send(rejectEvent), is(false));
+    assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
   private int killServerReceivedMessage() {
@@ -351,9 +351,9 @@ public class LoadBalancedClusterMessageSenderTest {
       }
 
       if ("TxStartedEvent".equals(request.getType()) && request.getCompensationMethod().equals("reject")) {
-        responseObserver.onNext(GrpcAck.newBuilder().setValid(false).build());
+        responseObserver.onNext(GrpcAck.newBuilder().setAborted(true).build());
       } else {
-        responseObserver.onNext(GrpcAck.newBuilder().setValid(true).build());
+        responseObserver.onNext(GrpcAck.newBuilder().setAborted(false).build());
       }
 
       responseObserver.onCompleted();
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index c30953f..7daf954 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
@@ -310,7 +311,10 @@ public class TransactionInterceptionTest {
 
     @Bean
     MessageSender sender() {
-      return (event) -> messages.add(event.toString());
+      return (event) -> {
+        messages.add(event.toString());
+        return new AlphaResponse(false);
+      };
     }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/AlphaResponse.java
similarity index 76%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/AlphaResponse.java
index 86cacc2..07df793 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/AlphaResponse.java
@@ -17,21 +17,14 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
-public interface MessageSender {
-  MessageSender NO_OP_SENDER = event -> true;
+public class AlphaResponse {
+  private final boolean aborted;
 
-  default void onConnected() {
+  public AlphaResponse(boolean aborted) {
+    this.aborted = aborted;
   }
 
-  default void onDisconnected() {
+  public boolean aborted() {
+    return aborted;
   }
-
-  default void close() {
-  }
-
-  default String target() {
-    return "UNKNOWN";
-  }
-
-  boolean send(TxEvent event);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index e24b92c..074a5ec 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,7 +29,7 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
     return sender
         .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
   }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 2263b38..5f8165f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
-      return true;
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return new AlphaResponse(false);
     }
 
     @Override
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  boolean preIntercept(String parentTxId, String compensationMethod, Object... message);
+  AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 86cacc2..5271b82 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,7 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSender {
-  MessageSender NO_OP_SENDER = event -> true;
+  MessageSender NO_OP_SENDER = event -> new AlphaResponse(false);
 
   default void onConnected() {
   }
@@ -33,5 +33,5 @@ public interface MessageSender {
     return "UNKNOWN";
   }
 
-  boolean send(TxEvent event);
+  AlphaResponse send(TxEvent event);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b6ef898..7299b25 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -30,7 +30,7 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+  public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
     return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
   }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index ee00565..ca14551 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,7 +29,7 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public boolean preIntercept(String parentTxId, String signature, Object... args) {
+  public AlphaResponse preIntercept(String parentTxId, String signature, Object... args) {
     return interceptor.preIntercept(parentTxId, signature, args);
   }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index e506899..c57ee0e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -57,11 +57,12 @@ public class TransactionAspect {
     context.newLocalTxId();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
-    boolean ok = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
-    if (!ok) {
-      LOG.info("Skipped transaction {} due to abort.", context.globalTxId());
+    AlphaResponse response = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    if (response.aborted()) {
+      String abortedLocalTxId = context.localTxId();
       context.setLocalTxId(localTxId);
-      return null;
+      throw new IllegalStateException("Abort local sub transaction " + abortedLocalTxId +
+          " due to global transaction " + context.globalTxId() + " has already aborted.");
     }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index d4bfef6..21af7e6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -41,7 +41,10 @@ public class CompensableInterceptorTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
 
-  private final MessageSender sender = messages::add;
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
 
   private final String message = uniquify("message");
   private final String compensationMethod = getClass().getCanonicalName();
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index bc126ae..0b33d4b 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -33,7 +33,10 @@ import org.junit.Test;
 public class CompensationMessageHandlerTest {
 
   private final List<TxEvent> events = new ArrayList<>();
-  private final MessageSender sender = events::add;
+  private final MessageSender sender = e -> {
+    events.add(e);
+    return new AlphaResponse(false);
+  };
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index c146f5b..f8e936d 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -36,7 +36,10 @@ public class SagaStartAnnotationProcessorTest {
 
   private final List<TxEvent> messages = new ArrayList<>();
 
-  private final MessageSender sender = messages::add;
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
 
   private final String globalTxId = UUID.randomUUID().toString();
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index 6d2e1a6..7316161 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -47,7 +47,10 @@ public class SagaStartAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
   private final String globalTxId = UUID.randomUUID().toString();
 
-  private final MessageSender sender = messages::add;
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
   private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
   private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 9e4055f..9ff0214 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -45,8 +45,8 @@ public class TimeAwareInterceptorTest {
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
     @Override
-    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
-      return true;
+    public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return new AlphaResponse(false);
     }
 
     @Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 8f7c73a..0d8fcd2 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -52,7 +52,10 @@ public class TransactionAspectTest {
 
   private final String newLocalTxId = UUID.randomUUID().toString();
 
-  private final MessageSender sender = messages::add;
+  private final MessageSender sender = e -> {
+    messages.add(e);
+    return new AlphaResponse(false);
+  };
   private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
   private final MethodSignature methodSignature = mock(MethodSignature.class);
 
@@ -166,13 +169,16 @@ public class TransactionAspectTest {
   @Test
   public void returnImmediatelyWhenReceivedRejectResponse() {
     MessageSender sender = mock(MessageSender.class);
-    when(sender.send(any())).thenReturn(false);
+    when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
     TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
     try {
       aspect.advise(joinPoint, compensable);
+      expectFailing(IllegalStateException.class);
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
     } catch (Throwable throwable) {
-      fail("Unexpected exception: " + throwable.getMessage());
+      expectFailing(IllegalStateException.class);
     }
 
     verify(sender, times(1)).send(any());
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 405edff..2636881 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,7 +33,7 @@ message GrpcServiceConfig {
 }
 
 message GrpcAck {
-  bool valid = 1;
+  bool aborted = 1;
 }
 
 message GrpcTxEvent {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.