You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 00:55:58 UTC
[incubator-servicecomb-saga] 01/06: SCB-227 stop sub tx from
running when global tx failed
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 4377256c228e836eed5d4e2e023e43e1920c4aa0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Jan 12 19:06:36 2018 +0800
SCB-227 stop sub tx from running when global tx failed
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../saga/alpha/core/TxConsistentService.java | 12 +++++++++-
.../saga/alpha/core/TxConsistentServiceTest.java | 13 ++++++++++
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 15 ++++++++----
.../saga/alpha/server/AlphaIntegrationTest.java | 19 +++++++++++++++
.../connector/grpc/GrpcClientMessageSender.java | 6 +++--
.../grpc/LoadBalancedClusterMessageSender.java | 7 ++++--
.../grpc/LoadBalancedClusterMessageSenderTest.java | 16 ++++++++++++-
.../omega/transaction/CompensableInterceptor.java | 5 ++--
.../omega/transaction/EventAwareInterceptor.java | 5 ++--
.../saga/omega/transaction/MessageSender.java | 5 ++--
.../transaction/SagaStartAnnotationProcessor.java | 4 ++--
.../omega/transaction/TimeAwareInterceptor.java | 4 ++--
.../saga/omega/transaction/TransactionAspect.java | 7 +++++-
.../transaction/TimeAwareInterceptorTest.java | 3 ++-
.../omega/transaction/TransactionAspectTest.java | 28 ++++++++++++++++++----
.../src/main/proto/GrpcTxEvent.proto | 1 +
16 files changed, 122 insertions(+), 28 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index bddba36..c9544ca 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -54,7 +54,11 @@ public class TxConsistentService {
this.omegaCallback = omegaCallback;
}
- public void handle(TxEvent event) {
+ public boolean handle(TxEvent event) {
+ if (isInvalidTxStarted(event)) {
+ return false;
+ }
+
eventRepository.save(event);
executor.execute(() -> {
@@ -64,6 +68,8 @@ public class TxConsistentService {
eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event);
});
+
+ return true;
}
private void compensate(TxEvent event) {
@@ -100,4 +106,8 @@ public class TxConsistentService {
private boolean isTxEndedEvent(TxEvent event) {
return TxEndedEvent.name().equals(event.type());
}
+
+ private boolean isInvalidTxStarted(TxEvent event) {
+ return TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event);
+ }
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index a44f16d..9318a06 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -135,6 +135,19 @@ public class TxConsistentServiceTest {
));
}
+ @Test
+ public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() {
+ String localTxId1 = UUID.randomUUID().toString();
+ events.add(newEvent(TxStartedEvent));
+ events.add(newEvent(TxAbortedEvent));
+
+ TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+
+ consistentService.handle(event);
+
+ assertThat(events.size(), is(2));
+ }
+
private TxEvent newEvent(EventType eventType) {
return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 4b3ea1b..6ca26ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -39,7 +39,9 @@ import io.grpc.stub.StreamObserver;
class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
- private static final GrpcAck ACK = GrpcAck.newBuilder().build();
+ private static final GrpcAck ALLOW = GrpcAck.newBuilder().setValid(true).build();
+ private static final GrpcAck REJECT = GrpcAck.newBuilder().setValid(false).build();
+
private final TxConsistentService txConsistentService;
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
@@ -67,13 +69,13 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
callback.disconnect();
}
- responseObserver.onNext(ACK);
+ responseObserver.onNext(ALLOW);
responseObserver.onCompleted();
}
@Override
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
- txConsistentService.handle(new TxEvent(
+ boolean ok = txConsistentService.handle(new TxEvent(
message.getServiceName(),
message.getInstanceId(),
new Date(message.getTimestamp()),
@@ -85,7 +87,12 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
message.getPayloads().toByteArray()
));
- responseObserver.onNext(ACK);
+ if (ok) {
+ responseObserver.onNext(ALLOW);
+ } else {
+ responseObserver.onNext(REJECT);
+ }
+
responseObserver.onCompleted();
}
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a501f75..eb857d1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -250,6 +251,24 @@ public class AlphaIntegrationTest {
anotherBlockingStub.onDisconnected(anotherServiceConfig);
}
+ @Test
+ public void doNotStartSubTxOnFailure() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+ blockingStub.onTxEvent(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+
+ blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+
+ await().atMost(1, SECONDS).until(() -> receivedCommands.size() == 1);
+
+ String localTxId1 = UUID.randomUUID().toString();
+ String parentTxId1 = UUID.randomUUID().toString();
+ GrpcAck result = blockingStub
+ .onTxEvent(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+
+ assertThat(result.getValid(), is(false));
+ }
+
private GrpcServiceConfig someServiceConfig() {
return GrpcServiceConfig.newBuilder()
.setServiceName(uniquify("serviceName"))
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index f6b5d76..70870bc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -28,6 +28,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
@@ -83,8 +84,9 @@ public class GrpcClientMessageSender implements MessageSender {
}
@Override
- public void send(TxEvent event) {
- blockingEventService.onTxEvent(convertEvent(event));
+ public boolean send(TxEvent event) {
+ GrpcAck grpcAck = blockingEventService.onTxEvent(convertEvent(event));
+ return grpcAck.getValid();
}
private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 73940ed..1eb6f7b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -124,14 +124,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
}
@Override
- public void send(TxEvent event) {
+ public boolean send(TxEvent event) {
+ boolean result = false;
boolean success = false;
do {
MessageSender messageSender = fastestSender();
try {
long startTime = System.nanoTime();
- messageSender.send(event);
+ result = messageSender.send(event);
senders.put(messageSender, System.nanoTime() - startTime);
success = true;
@@ -142,6 +143,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
senders.put(messageSender, Long.MAX_VALUE);
}
} while (!success && !Thread.currentThread().isInterrupted());
+
+ return result;
}
private MessageSender fastestSender() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1fdfd2e..6c9fdcd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -47,6 +47,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -293,6 +294,14 @@ public class LoadBalancedClusterMessageSenderTest {
await().atMost(2, SECONDS).until(() -> eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1);
}
+ @Test
+ public void forwardSendResult() {
+ assertThat(messageSender.send(event), is(true));
+
+ TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
+ assertThat(messageSender.send(rejectEvent), is(false));
+ }
+
private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
@@ -341,7 +350,12 @@ public class LoadBalancedClusterMessageSenderTest {
.build());
}
- responseObserver.onNext(GrpcAck.newBuilder().build());
+ if ("TxStartedEvent".equals(request.getType()) && request.getCompensationMethod().equals("reject")) {
+ responseObserver.onNext(GrpcAck.newBuilder().setValid(false).build());
+ } else {
+ responseObserver.onNext(GrpcAck.newBuilder().setValid(true).build());
+ }
+
responseObserver.onCompleted();
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 76193cd..e24b92c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,8 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
}
@Override
- public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
- sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
+ public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ return sender
+ .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 291538f..2263b38 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
public interface EventAwareInterceptor {
EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
@Override
- public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ return true;
}
@Override
@@ -32,7 +33,7 @@ public interface EventAwareInterceptor {
}
};
- void preIntercept(String parentTxId, String compensationMethod, Object... message);
+ boolean preIntercept(String parentTxId, String compensationMethod, Object... message);
void postIntercept(String parentTxId, String compensationMethod);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 2f05394..86cacc2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,8 +18,7 @@
package org.apache.servicecomb.saga.omega.transaction;
public interface MessageSender {
- MessageSender NO_OP_SENDER = event -> {
- };
+ MessageSender NO_OP_SENDER = event -> true;
default void onConnected() {
}
@@ -34,5 +33,5 @@ public interface MessageSender {
return "UNKNOWN";
}
- void send(TxEvent event);
+ boolean send(TxEvent event);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 2dad5ae..b6ef898 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -30,8 +30,8 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
}
@Override
- public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
- sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 80ad03f..ee00565 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -29,8 +29,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
}
@Override
- public void preIntercept(String parentTxId, String signature, Object... args) {
- interceptor.preIntercept(parentTxId, signature, args);
+ public boolean preIntercept(String parentTxId, String signature, Object... args) {
+ return interceptor.preIntercept(parentTxId, signature, args);
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index d3e091c..e506899 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -57,7 +57,12 @@ public class TransactionAspect {
context.newLocalTxId();
TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
- interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+ boolean ok = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+ if (!ok) {
+ LOG.info("Skipped transaction {} due to abort.", context.globalTxId());
+ context.setLocalTxId(localTxId);
+ return null;
+ }
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout());
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index d26f04f..9e4055f 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -45,7 +45,8 @@ public class TimeAwareInterceptorTest {
private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
@Override
- public void preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ public boolean preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ return true;
}
@Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 63cdff0..8f7c73a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -23,6 +23,10 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -40,7 +44,6 @@ import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
public class TransactionAspectTest {
private final List<TxEvent> messages = new ArrayList<>();
@@ -50,12 +53,12 @@ public class TransactionAspectTest {
private final String newLocalTxId = UUID.randomUUID().toString();
private final MessageSender sender = messages::add;
- private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
- private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+ private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+ private final MethodSignature methodSignature = mock(MethodSignature.class);
@SuppressWarnings("unchecked")
- private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
- private final Compensable compensable = Mockito.mock(Compensable.class);
+ private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+ private final Compensable compensable = mock(Compensable.class);
private final OmegaContext omegaContext = new OmegaContext(idGenerator);
private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
@@ -160,6 +163,21 @@ public class TransactionAspectTest {
assertThat(messages.size(), is(2));
}
+ @Test
+ public void returnImmediatelyWhenReceivedRejectResponse() {
+ MessageSender sender = mock(MessageSender.class);
+ when(sender.send(any())).thenReturn(false);
+
+ TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+ try {
+ aspect.advise(joinPoint, compensable);
+ } catch (Throwable throwable) {
+ fail("Unexpected exception: " + throwable.getMessage());
+ }
+
+ verify(sender, times(1)).send(any());
+ }
+
private String doNothing() {
return "doNothing";
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9522b6f..405edff 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -33,6 +33,7 @@ message GrpcServiceConfig {
}
message GrpcAck {
+ bool valid = 1;
}
message GrpcTxEvent {
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.