You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 00:55:58 UTC

[incubator-servicecomb-saga] 01/06: SCB-227 stop sub tx from running when global tx failed

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 4377256c228e836eed5d4e2e023e43e1920c4aa0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Jan 12 19:06:36 2018 +0800

    SCB-227 stop sub tx from running when global tx failed
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/core/TxConsistentService.java       | 12 +++++++++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 13 ++++++++++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 15 ++++++++----
 .../saga/alpha/server/AlphaIntegrationTest.java    | 19 +++++++++++++++
 .../connector/grpc/GrpcClientMessageSender.java    |  6 +++--
 .../grpc/LoadBalancedClusterMessageSender.java     |  7 ++++--
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 16 ++++++++++++-
 .../omega/transaction/CompensableInterceptor.java  |  5 ++--
 .../omega/transaction/EventAwareInterceptor.java   |  5 ++--
 .../saga/omega/transaction/MessageSender.java      |  5 ++--
 .../transaction/SagaStartAnnotationProcessor.java  |  4 ++--
 .../omega/transaction/TimeAwareInterceptor.java    |  4 ++--
 .../saga/omega/transaction/TransactionAspect.java  |  7 +++++-
 .../transaction/TimeAwareInterceptorTest.java      |  3 ++-
 .../omega/transaction/TransactionAspectTest.java   | 28 ++++++++++++++++++----
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 16 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index bddba36..c9544ca 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -54,7 +54,11 @@ public class TxConsistentService {
     this.omegaCallback = omegaCallback;
   }
 
-  public void handle(TxEvent event) {
+  public boolean handle(TxEvent event) {
+    if (isInvalidTxStarted(event)) {
+      return false;
+    }
+
     eventRepository.save(event);
 
     executor.execute(() -> {
@@ -64,6 +68,8 @@ public class TxConsistentService {
 
       eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
     });
+
+    return true;
   }
 
   private void compensate(TxEvent event) {
@@ -100,4 +106,8 @@ public class TxConsistentService {
   private boolean isTxEndedEvent(TxEvent event) {
     return TxEndedEvent.name().equals(event.type());
   }
+
+  private boolean isInvalidTxStarted(TxEvent event) {
+    return TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event);
+  }
 }
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index a44f16d..9318a06 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -135,6 +135,19 @@ public class TxConsistentServiceTest {
     ));
   }
 
+  @Test
+  public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(newEvent(TxStartedEvent));
+    events.add(newEvent(TxAbortedEvent));
+
+    TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+
+    consistentService.handle(event);
+
+    assertThat(events.size(), is(2));
+  }
+
   private TxEvent newEvent(EventType eventType) {
     return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 4b3ea1b..6ca26ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -39,7 +39,9 @@ import io.grpc.stub.StreamObserver;
 
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
-  private static final GrpcAck ACK = GrpcAck.newBuilder().build();
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setValid(true).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setValid(false).build();
+
   private final TxConsistentService txConsistentService;
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
@@ -67,13 +69,13 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
       callback.disconnect();
     }
 
-    responseObserver.onNext(ACK);
+    responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
   }
 
   @Override
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
-    txConsistentService.handle(new TxEvent(
+    boolean ok = txConsistentService.handle(new TxEvent(
         message.getServiceName(),
         message.getInstanceId(),
         new Date(message.getTimestamp()),
@@ -85,7 +87,12 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getPayloads().toByteArray()
     ));
 
-    responseObserver.onNext(ACK);
+    if (ok) {
+      responseObserver.onNext(ALLOW);
+    } else {
+      responseObserver.onNext(REJECT);
+    }
+
     responseObserver.onCompleted();
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a501f75..eb857d1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -250,6 +251,24 @@ public class AlphaIntegrationTest {
     anotherBlockingStub.onDisconnected(anotherServiceConfig);
   }
 
+  @Test
+  public void doNotStartSubTxOnFailure() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+
+    blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+    await().atMost(1, SECONDS).until(() -> receivedCommands.size() == 1);
+
+    String localTxId1 = UUID.randomUUID().toString();
+    String parentTxId1 = UUID.randomUUID().toString();
+    GrpcAck result = blockingStub
+        .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+
+    assertThat(result.getValid(), is(false));
+  }
+
   private GrpcServiceConfig someServiceConfig() {
     return GrpcServiceConfig.newBuilder()
         .setServiceName(uniquify("serviceName"))
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index f6b5d76..70870bc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -28,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
@@ -83,8 +84,9 @@ public class GrpcClientMessageSender implements MessageSender {
   }
 
   @Override
-  public void send(TxEvent event) {
-    blockingEventService.onTxEvent(convertEvent(event));
+  public boolean send(TxEvent event) {
+    GrpcAck grpcAck = blockingEventService.onTxEvent(convertEvent(event));
+    return grpcAck.getValid();
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 73940ed..1eb6f7b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -124,14 +124,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   }
 
   @Override
-  public void send(TxEvent event) {
+  public boolean send(TxEvent event) {
+    boolean result = false;
     boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
 
       try {
         long startTime = System.nanoTime();
-        messageSender.send(event);
+        result = messageSender.send(event);
         senders.put(messageSender, System.nanoTime() - startTime);
 
         success = true;
@@ -142,6 +143,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         senders.put(messageSender, Long.MAX_VALUE);
       }
     } while (!success && !Thread.currentThread().isInterrupted());
+
+    return result;
   }
 
   private MessageSender fastestSender() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1fdfd2e..6c9fdcd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -47,6 +47,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -293,6 +294,14 @@ public class LoadBalancedClusterMessageSenderTest {
     await().atMost(2, SECONDS).until(() -> eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1);
   }
 
+  @Test
+  public void forwardSendResult() {
+    assertThat(messageSender.send(event), is(true));
+
+    TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
+    assertThat(messageSender.send(rejectEvent), is(false));
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
@@ -341,7 +350,12 @@ public class LoadBalancedClusterMessageSenderTest {
             .build());
       }
 
-      responseObserver.onNext(GrpcAck.newBuilder().build());
+      if ("TxStartedEvent".equals(request.getType()) && request.getCompensationMethod().equals("reject")) {
+        responseObserver.onNext(GrpcAck.newBuilder().setValid(false).build());
+      } else {
+        responseObserver.onNext(GrpcAck.newBuilder().setValid(true).build());
+      }
+
       responseObserver.onCompleted();
     }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 76193cd..e24b92c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,8 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
+  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    return sender
+        .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 291538f..2263b38 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
 public interface EventAwareInterceptor {
   EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
     @Override
-    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return true;
     }
 
     @Override
@@ -32,7 +33,7 @@ public interface EventAwareInterceptor {
     }
   };
 
-  void preIntercept(String parentTxId, String compensationMethod, Object... message);
+  boolean preIntercept(String parentTxId, String compensationMethod, Object... message);
 
   void postIntercept(String parentTxId, String compensationMethod);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 2f05394..86cacc2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,8 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSender {
-  MessageSender NO_OP_SENDER = event -> {
-  };
+  MessageSender NO_OP_SENDER = event -> true;
 
   default void onConnected() {
   }
@@ -34,5 +33,5 @@ public interface MessageSender {
     return "UNKNOWN";
   }
 
-  void send(TxEvent event);
+  boolean send(TxEvent event);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 2dad5ae..b6ef898 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -30,8 +30,8 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+  public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 80ad03f..ee00565 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,8 +29,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
   }
 
   @Override
-  public void preIntercept(String parentTxId, String signature, Object... args) {
-    interceptor.preIntercept(parentTxId, signature, args);
+  public boolean preIntercept(String parentTxId, String signature, Object... args) {
+    return interceptor.preIntercept(parentTxId, signature, args);
   }
 
   @Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index d3e091c..e506899 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -57,7 +57,12 @@ public class TransactionAspect {
     context.newLocalTxId();
 
     TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
-    interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    boolean ok = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+    if (!ok) {
+      LOG.info("Skipped transaction {} due to abort.", context.globalTxId());
+      context.setLocalTxId(localTxId);
+      return null;
+    }
     LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
 
     scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index d26f04f..9e4055f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -45,7 +45,8 @@ public class TimeAwareInterceptorTest {
 
   private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
     @Override
-    public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+    public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+      return true;
     }
 
     @Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 63cdff0..8f7c73a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -23,6 +23,10 @@ import static org.awaitility.Awaitility.await;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -40,7 +44,6 @@ import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TransactionAspectTest {
   private final List<TxEvent> messages = new ArrayList<>();
@@ -50,12 +53,12 @@ public class TransactionAspectTest {
   private final String newLocalTxId = UUID.randomUUID().toString();
 
   private final MessageSender sender = messages::add;
-  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
-  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+  private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = mock(MethodSignature.class);
 
   @SuppressWarnings("unchecked")
-  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
-  private final Compensable compensable = Mockito.mock(Compensable.class);
+  private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+  private final Compensable compensable = mock(Compensable.class);
 
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
@@ -160,6 +163,21 @@ public class TransactionAspectTest {
     assertThat(messages.size(), is(2));
   }
 
+  @Test
+  public void returnImmediatelyWhenReceivedRejectResponse() {
+    MessageSender sender = mock(MessageSender.class);
+    when(sender.send(any())).thenReturn(false);
+
+    TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+    try {
+      aspect.advise(joinPoint, compensable);
+    } catch (Throwable throwable) {
+      fail("Unexpected exception: " + throwable.getMessage());
+    }
+
+    verify(sender, times(1)).send(any());
+  }
+
   private String doNothing() {
     return "doNothing";
   }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9522b6f..405edff 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,7 @@ message GrpcServiceConfig {
 }
 
 message GrpcAck {
+  bool valid = 1;
 }
 
 message GrpcTxEvent {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.