You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:48:07 UTC

[incubator-servicecomb-saga] 02/06: SCB-164 ensured compensation stream can be established after reconnection

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 105cf029d3b3ca8b460e8e7d841bfcc749b3508b
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:29:28 2018 +0800

    SCB-164 ensured compensation stream can be established after reconnection
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 42b3162..22867d1 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -40,6 +42,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -79,7 +82,10 @@ public class LoadBalancedClusterMessageSenderTest {
   private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
 
   private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
+
+  private final List<String> compensated = new ArrayList<>();
   private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
+    compensated.add(globalTxId);
   };
 
   private final String globalTxId = uniquify("globalTxId");
@@ -158,14 +164,21 @@ public class LoadBalancedClusterMessageSenderTest {
 
     int deadPort = killServerReceivedMessage();
 
+    // ensure live message sender has latency greater than 0
+    messageSender.send(event);
+
     startServerOnPort(deadPort);
     await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
 
-    messageSender.send(event);
-    messageSender.send(event);
+    TxEvent abortedEvent = new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+    messageSender.send(abortedEvent);
 
     // restarted server gets priority, since it had no traffic
     assertThat(eventsMap.get(deadPort).size(), is(2));
+    assertThat(eventsMap.get(deadPort).poll().toString(), is(event.toString()));
+    assertThat(eventsMap.get(deadPort).poll().toString(), is(abortedEvent.toString()));
+
+    await().atMost(1, SECONDS).until(() -> compensated.contains(globalTxId));
   }
 
   @Test (timeout = 1000)
@@ -266,6 +279,7 @@ public class LoadBalancedClusterMessageSenderTest {
     private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
+    private StreamObserver<GrpcCompensateCommand> responseObserver;
 
     private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
       this.connected = connected;
@@ -275,6 +289,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+      this.responseObserver = responseObserver;
       connected.add("Connected " + request.getServiceName());
     }
 
@@ -289,6 +304,13 @@ public class LoadBalancedClusterMessageSenderTest {
 
       sleep();
 
+      if ("TxAbortedEvent".equals(request.getType())) {
+        this.responseObserver.onNext(GrpcCompensateCommand
+            .newBuilder()
+            .setGlobalTxId(request.getGlobalTxId())
+            .build());
+      }
+
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.