You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:48:07 UTC
[incubator-servicecomb-saga] 02/06: SCB-164 ensured compensation
stream can be established after reconnection
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 105cf029d3b3ca8b460e8e7d841bfcc749b3508b
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:29:28 2018 +0800
SCB-164 ensured compensation stream can be established after reconnection
Signed-off-by: seanyinx <se...@huawei.com>
---
.../grpc/LoadBalancedClusterMessageSenderTest.java | 26 ++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 42b3162..22867d1 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -40,6 +42,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -79,7 +82,10 @@ public class LoadBalancedClusterMessageSenderTest {
private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
+
+ private final List<String> compensated = new ArrayList<>();
private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
+ compensated.add(globalTxId);
};
private final String globalTxId = uniquify("globalTxId");
@@ -158,14 +164,21 @@ public class LoadBalancedClusterMessageSenderTest {
int deadPort = killServerReceivedMessage();
+ // ensure live message sender has latency greater than 0
+ messageSender.send(event);
+
startServerOnPort(deadPort);
await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
- messageSender.send(event);
- messageSender.send(event);
+ TxEvent abortedEvent = new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+ messageSender.send(abortedEvent);
// restarted server gets priority, since it had no traffic
assertThat(eventsMap.get(deadPort).size(), is(2));
+ assertThat(eventsMap.get(deadPort).poll().toString(), is(event.toString()));
+ assertThat(eventsMap.get(deadPort).poll().toString(), is(abortedEvent.toString()));
+
+ await().atMost(1, SECONDS).until(() -> compensated.contains(globalTxId));
}
@Test (timeout = 1000)
@@ -266,6 +279,7 @@ public class LoadBalancedClusterMessageSenderTest {
private final Queue<String> connected;
private final Queue<TxEvent> events;
private final int delay;
+ private StreamObserver<GrpcCompensateCommand> responseObserver;
private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
this.connected = connected;
@@ -275,6 +289,7 @@ public class LoadBalancedClusterMessageSenderTest {
@Override
public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+ this.responseObserver = responseObserver;
connected.add("Connected " + request.getServiceName());
}
@@ -289,6 +304,13 @@ public class LoadBalancedClusterMessageSenderTest {
sleep();
+ if ("TxAbortedEvent".equals(request.getType())) {
+ this.responseObserver.onNext(GrpcCompensateCommand
+ .newBuilder()
+ .setGlobalTxId(request.getGlobalTxId())
+ .build());
+ }
+
responseObserver.onNext(GrpcAck.newBuilder().build());
responseObserver.onCompleted();
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.