You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/10 08:01:10 UTC

[incubator-servicecomb-saga] branch SCB-164_reconnect_on_crash updated (40ca544 -> 18f8b36)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    omit 40ca544  SCB-164 fail fast on script error
    omit 997b9d1  SCB-164 added waiting for async update to fix random test failures
    omit b70cc85  SCB-164 reused const ACK to avoid recreate it every time
    omit 5790155  SCB-164 ensured compensation stream can be established after reconnection
    omit 5e69cdc  SCB-164 reconnected to alpha on connection loss
    omit 8dd5fc1  SCB-168 connected to alpha on start
    omit ae023a0  SCB-168 removed default logging from spring boot
    omit 3b528a9  SCB-168 resolved rebase conflict
    omit 32977d6  SCB-168 made latency significant enough to pass test
    omit 3fbf37a  SCB-168 exception free connection & disconnection
    omit ee6b073  SCB-168 load balance based on server latency
    omit 39ac401  SCB-168 load balanced alpha cluster client
     add e5efd41  SCB-168 load balanced alpha cluster client
     add 01c4a1d  SCB-168 load balance based on server latency
     add 2c395da  SCB-168 exception free connection & disconnection
     add 5529689  SCB-168 made latency significant enough to pass test
     add e2828ee  SCB-168 resolved rebase conflict
     add 1e7ff02  SCB-168 removed default logging from spring boot
     add 367b1cc  SCB-168 connected to alpha on start
     new 68af073  SCB-164 reconnected to alpha on connection loss
     new e4e02fd  SCB-164 ensured compensation stream can be established after reconnection
     new 01b7e33  SCB-164 reused const ACK to avoid recreate it every time
     new 343ff59  SCB-164 added waiting for async update to fix random test failures
     new 18f8b36  SCB-164 fail fast on script error

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (40ca544)
            \
             N -- N -- N   refs/heads/SCB-164_reconnect_on_crash (18f8b36)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 05/05: SCB-164 fail fast on script error

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 18f8b368b70749a3f78b8f6103f8718852db8834
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 09:37:50 2018 +0800

    SCB-164 fail fast on script error
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .travis.yml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.travis.yml b/.travis.yml
index 5c1be87..2efd762 100755
--- a/.travis.yml
+++ b/.travis.yml
@@ -23,6 +23,7 @@ language: java
 jdk:
 - oraclejdk8
 script:
+- set -e
 - mvn clean install -Pjacoco -Pdocker coveralls:report
 - mvn clean verify -f saga-demo -Pdemo -Pdocker -Ddocker.useColor=false -Ddocker.showLogs
 after_success:

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/05: SCB-164 reconnected to alpha on connection loss

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 68af073883088c08fd6a559e99a727b5badff93e
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:13:48 2018 +0800

    SCB-164 reconnected to alpha on connection loss
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../connector/grpc/GrpcClientMessageSender.java    |   5 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   5 +-
 .../grpc/LoadBalancedClusterMessageSender.java     |  57 +++++++---
 .../connector/grpc/PushBackReconnectRunnable.java  |  56 ++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 116 ++++++++++++---------
 .../grpc/PushBackReconnectRunnableTest.java        |  75 +++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       |   4 +-
 .../saga/omega/transaction/MessageSender.java      |   3 +
 8 files changed, 255 insertions(+), 66 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 4729db1..5534db9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,6 +20,8 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
+import java.util.function.Function;
+
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
@@ -54,13 +56,14 @@ public class GrpcClientMessageSender implements MessageSender {
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
+      Function<MessageSender, Runnable> errorHandlerFactory,
       MessageHandler handler) {
     this.target = address;
     this.asyncEventService = TxEventServiceGrpc.newStub(channel);
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, deserializer);
+    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 7a845eb..3cf46f8 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -35,10 +35,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MessageHandler messageHandler;
+  private final Runnable errorHandler;
   private final MessageDeserializer deserializer;
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler, MessageDeserializer deserializer) {
+  GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) {
     this.messageHandler = messageHandler;
+    this.errorHandler = errorHandler;
     this.deserializer = deserializer;
   }
 
@@ -58,6 +60,7 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
   @Override
   public void onError(Throwable t) {
     LOG.error("failed to process grpc compensate command.", t);
+    errorHandler.run();
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 14ce340..3cb6677 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -26,7 +27,11 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.function.Consumer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -45,11 +50,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   private final Map<MessageSender, Long> senders = new HashMap<>();
   private final Collection<ManagedChannel> channels;
 
+  private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
   public LoadBalancedClusterMessageSender(String[] addresses,
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
-      MessageHandler handler) {
+      MessageHandler handler,
+      int reconnectDelay) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address provided");
@@ -69,11 +78,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
               serializer,
               deserializer,
               serviceConfig,
+              errorHandlerFactory(),
               handler),
           0L);
     }
+
+    scheduleReconnectTask(reconnectDelay);
   }
 
+  // this is for test only
   LoadBalancedClusterMessageSender(MessageSender... messageSenders) {
     for (MessageSender sender : messageSenders) {
       senders.put(sender, 0L);
@@ -106,6 +119,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public void close() {
+    scheduler.shutdown();
     channels.forEach(ManagedChannel::shutdownNow);
   }
 
@@ -113,28 +127,45 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   public void send(TxEvent event) {
     boolean success = false;
     do {
-      try {
-        withFastestSender(messageSender -> {
-          // very large latency on exception
-          senders.put(messageSender, Long.MAX_VALUE);
+      MessageSender messageSender = fastestSender();
 
-          long startTime = System.nanoTime();
-          messageSender.send(event);
-          senders.put(messageSender, System.nanoTime() - startTime);
-        });
+      try {
+        long startTime = System.nanoTime();
+        messageSender.send(event);
+        senders.put(messageSender, System.nanoTime() - startTime);
 
         success = true;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
+
+        // very large latency on exception
+        senders.put(messageSender, Long.MAX_VALUE);
       }
     } while (!success && !Thread.currentThread().isInterrupted());
   }
 
-  private void withFastestSender(Consumer<MessageSender> consumer) {
-    senders.entrySet()
+  private MessageSender fastestSender() {
+    return senders.entrySet()
         .stream()
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .ifPresent(consumer);
+        .orElse(NO_OP_SENDER);
+  }
+
+  private void scheduleReconnectTask(int reconnectDelay) {
+    scheduler.scheduleWithFixedDelay(() -> {
+      try {
+        pendingTasks.take().run();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }, 0, reconnectDelay, MILLISECONDS);
+  }
+
+  private Function<MessageSender, Runnable> errorHandlerFactory() {
+    return messageSender -> {
+      Runnable runnable = new PushBackReconnectRunnable(messageSender, senders, pendingTasks);
+      return () -> pendingTasks.offer(runnable);
+    };
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
new file mode 100644
index 0000000..d21e568
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PushBackReconnectRunnable implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final MessageSender messageSender;
+  private final Map<MessageSender, Long> senders;
+  private final BlockingQueue<Runnable> pendingTasks;
+
+  PushBackReconnectRunnable(
+      MessageSender messageSender,
+      Map<MessageSender, Long> senders,
+      BlockingQueue<Runnable> pendingTasks) {
+    this.messageSender = messageSender;
+    this.senders = senders;
+    this.pendingTasks = pendingTasks;
+  }
+
+  @Override
+  public void run() {
+    try {
+      log.info("Retry connecting to alpha at {}", messageSender.target());
+      messageSender.onDisconnected();
+      messageSender.onConnected();
+      senders.put(messageSender, 0L);
+      log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+    } catch (Exception e) {
+      log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+      pendingTasks.offer(this);
+    }
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1868f31..42b3162 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,15 +29,11 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -53,37 +49,31 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
 import org.junit.Test;
-import org.junit.runners.MethodSorters;
 import org.mockito.Mockito;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class LoadBalancedClusterMessageSenderTest {
 
   private static final int[] ports = {8080, 8090};
-  private static final List<Server> servers = new ArrayList<>();
-
-  private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>();
-  private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>();
+  private static final Map<Integer, Server> servers = new HashMap<>();
 
   private static final Map<Integer, Integer> delays = new HashMap<Integer, Integer>() {{
     put(8080, 0);
     put(8090, 500);
   }};
 
-  private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{
-    put(8080, new ConcurrentSkipListSet<>());
-    put(8090, new ConcurrentSkipListSet<>());
+  private static final Map<Integer, Queue<String>> connected = new HashMap<Integer, Queue<String>>() {{
+    put(8080, new ConcurrentLinkedQueue<>());
+    put(8090, new ConcurrentLinkedQueue<>());
   }};
 
   private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{
-    put(8080, eventsOn8080);
-    put(8090, eventsOn8090);
+    put(8080, new ConcurrentLinkedQueue<>());
+    put(8090, new ConcurrentLinkedQueue<>());
   }};
 
   private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
@@ -108,49 +98,74 @@ public class LoadBalancedClusterMessageSenderTest {
         serializer,
         deserializer,
         new ServiceConfig(serviceName),
-        handler);
+        handler,
+        100);
   }
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    Arrays.stream(ports).forEach(port -> {
-      ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
-      serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
-      Server server = serverBuilder.build();
+    Arrays.stream(ports).forEach(LoadBalancedClusterMessageSenderTest::startServerOnPort);
+  }
 
-      try {
-        server.start();
-        servers.add(server);
-      } catch (IOException e) {
-        fail(e.getMessage());
-      }
-    });
+  private static void startServerOnPort(int port) {
+    ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
+    serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
+    Server server = serverBuilder.build();
+
+    try {
+      server.start();
+      servers.put(port, server);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    servers.forEach(Server::shutdown);
+    servers.values().forEach(Server::shutdown);
   }
 
   @After
   public void after() throws Exception {
-    eventsOn8080.clear();
-    eventsOn8090.clear();
+    eventsMap.values().forEach(Queue::clear);
+    connected.values().forEach(Queue::clear);
   }
 
   @Test
   public void resendToAnotherServerOnFailure() throws Exception {
     messageSender.send(event);
 
-    killServerReceivedMessage();
+    int deadPort = killServerReceivedMessage();
 
     messageSender.send(event);
+    messageSender.send(event);
+
+    assertThat(eventsMap.get(deadPort).size(), is(1));
+    assertThat(eventsMap.get(deadPort).peek().toString(), is(event.toString()));
+
+    int livePort = deadPort == 8080? 8090 : 8080;
+    assertThat(eventsMap.get(livePort).size(), is(2));
+    assertThat(eventsMap.get(livePort).peek().toString(), is(event.toString()));
 
-    assertThat(eventsOn8080.size(), is(1));
-    assertThat(eventsOn8080.peek().toString(), is(event.toString()));
+    // restart killed server in order not to affect other tests
+    startServerOnPort(deadPort);
+  }
+
+  @Test
+  public void resetLatencyOnReconnection() throws Exception {
+    messageSender.onConnected();
+    messageSender.send(event);
+
+    int deadPort = killServerReceivedMessage();
+
+    startServerOnPort(deadPort);
+    await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
+
+    messageSender.send(event);
+    messageSender.send(event);
 
-    assertThat(eventsOn8090.size(), is(1));
-    assertThat(eventsOn8090.peek().toString(), is(event.toString()));
+    // restarted server gets priority, since it had no traffic
+    assertThat(eventsMap.get(deadPort).size(), is(2));
   }
 
   @Test (timeout = 1000)
@@ -174,12 +189,12 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.onConnected();
     await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty());
 
-    assertThat(connected.get(8080), contains(serviceName));
-    assertThat(connected.get(8090), contains(serviceName));
+    assertThat(connected.get(8080), contains("Connected " + serviceName));
+    assertThat(connected.get(8090), contains("Connected " + serviceName));
 
     messageSender.onDisconnected();
-    assertThat(connected.get(8080).isEmpty(), is(true));
-    assertThat(connected.get(8090).isEmpty(), is(true));
+    assertThat(connected.get(8080), contains("Connected " + serviceName, "Disconnected " + serviceName));
+    assertThat(connected.get(8090), contains("Connected " + serviceName, "Disconnected " + serviceName));
   }
 
   @Test
@@ -222,8 +237,8 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.send(event);
     messageSender.send(event);
 
-    assertThat(eventsOn8080.size(), is(3));
-    assertThat(eventsOn8090.size(), is(1));
+    assertThat(eventsMap.get(8080).size(), is(3));
+    assertThat(eventsMap.get(8090).size(), is(1));
   }
 
   @Test
@@ -236,22 +251,23 @@ public class LoadBalancedClusterMessageSenderTest {
     }
   }
 
-  private void killServerReceivedMessage() {
-    int index = 0;
+  private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
-        servers.get(index).shutdownNow();
+        Server serverToKill = servers.get(port);
+        serverToKill.shutdownNow();
+        return port;
       }
-      index++;
     }
+    throw new IllegalStateException("None of the servers received any message");
   }
 
   private static class MyTxEventService extends TxEventServiceImplBase {
-    private final Set<String> connected;
+    private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
 
-    private MyTxEventService(Set<String> connected, Queue<TxEvent> events, int delay) {
+    private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
       this.connected = connected;
       this.events = events;
       this.delay = delay;
@@ -259,7 +275,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
-      connected.add(request.getServiceName());
+      connected.add("Connected " + request.getServiceName());
     }
 
     @Override
@@ -287,7 +303,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
-      connected.remove(request.getServiceName());
+      connected.add("Disconnected " + request.getServiceName());
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
new file mode 100644
index 0000000..a5b51b1
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PushBackReconnectRunnableTest {
+  private static final Runnable NO_OP_RUNNABLE = () -> {
+  };
+
+  private final MessageSender sender = Mockito.mock(MessageSender.class);
+  private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+  private final Map<MessageSender, Long> senders = new HashMap<>();
+  private final PushBackReconnectRunnable pushBack = new PushBackReconnectRunnable(sender, senders, runnables);
+
+  @Before
+  public void setUp() throws Exception {
+    runnables.offer(NO_OP_RUNNABLE);
+    senders.put(sender, Long.MAX_VALUE);
+  }
+
+  @Test
+  public void pushFailedCallbackToEndOfQueue() throws Exception {
+    doThrow(RuntimeException.class).doThrow(RuntimeException.class).doNothing().when(sender).onDisconnected();
+    assertThat(runnables, contains(NO_OP_RUNNABLE));
+
+    pushBack.run();
+
+    assertThat(runnables, contains(NO_OP_RUNNABLE, pushBack));
+    assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
+    assertThat(runnables, contains(pushBack));
+
+    // failed again and pushed back itself to queue
+    runnables.poll().run();
+    assertThat(runnables, contains(pushBack));
+
+    runnables.poll().run();
+
+    assertThat(runnables.isEmpty(), is(true));
+    assertThat(senders.get(sender), is(0L));
+
+    verify(sender, times(3)).onDisconnected();
+    verify(sender, times(1)).onConnected();
+  }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index b5d118a..e5806ad 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -52,6 +52,7 @@ class OmegaSpringConfig {
   @Bean
   MessageSender grpcMessageSender(
       @Value("${alpha.cluster.address}") String[] addresses,
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
       ServiceConfig serviceConfig,
       @Lazy MessageHandler handler) {
 
@@ -61,7 +62,8 @@ class OmegaSpringConfig {
         messageFormat,
         messageFormat,
         serviceConfig,
-        handler);
+        handler,
+        reconnectDelay);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 6a4ede6..2f05394 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,6 +18,9 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSender {
+  MessageSender NO_OP_SENDER = event -> {
+  };
+
   default void onConnected() {
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/05: SCB-164 added waiting for async update to fix random test failures

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 343ff5958e36067886b0b46858a389bc71d2ff86
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Jan 10 08:56:57 2018 +0800

    SCB-164 added waiting for async update to fix random test failures
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java  | 2 +-
 .../saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 21e46cd..4997c00 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -149,7 +149,7 @@ public class AlphaIntegrationTest {
         omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
         is(false));
 
-    assertThat(compensateResponseObserver.isCompleted(), is(true));
+    await().atMost(1, SECONDS).until(compensateResponseObserver::isCompleted);
   }
 
   @Test
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 22867d1..78dce6b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -250,7 +250,7 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.send(event);
     messageSender.send(event);
 
-    assertThat(eventsMap.get(8080).size(), is(3));
+    await().atMost(1, SECONDS).until(() -> eventsMap.get(8080).size() == 3);
     assertThat(eventsMap.get(8090).size(), is(1));
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/05: SCB-164 reused const ACK to avoid recreate it every time

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 01b7e33fab6a0c09b6b2b9931201b56232a69c40
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 17:42:37 2018 +0800

    SCB-164 reused const ACK to avoid recreate it every time
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index fc5f608..4b3ea1b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -67,7 +67,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
       callback.disconnect();
     }
 
-    responseObserver.onNext(GrpcAck.newBuilder().build());
+    responseObserver.onNext(ACK);
     responseObserver.onCompleted();
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/05: SCB-164 ensured compensation stream can be established after reconnection

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-164_reconnect_on_crash
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e4e02fd5b16ad53ab705049db28d19b4a517420c
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:29:28 2018 +0800

    SCB-164 ensured compensation stream can be established after reconnection
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 42b3162..22867d1 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -40,6 +42,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -79,7 +82,10 @@ public class LoadBalancedClusterMessageSenderTest {
   private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
 
   private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
+
+  private final List<String> compensated = new ArrayList<>();
   private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
+    compensated.add(globalTxId);
   };
 
   private final String globalTxId = uniquify("globalTxId");
@@ -158,14 +164,21 @@ public class LoadBalancedClusterMessageSenderTest {
 
     int deadPort = killServerReceivedMessage();
 
+    // ensure live message sender has latency greater than 0
+    messageSender.send(event);
+
     startServerOnPort(deadPort);
     await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
 
-    messageSender.send(event);
-    messageSender.send(event);
+    TxEvent abortedEvent = new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
+    messageSender.send(abortedEvent);
 
     // restarted server gets priority, since it had no traffic
     assertThat(eventsMap.get(deadPort).size(), is(2));
+    assertThat(eventsMap.get(deadPort).poll().toString(), is(event.toString()));
+    assertThat(eventsMap.get(deadPort).poll().toString(), is(abortedEvent.toString()));
+
+    await().atMost(1, SECONDS).until(() -> compensated.contains(globalTxId));
   }
 
   @Test (timeout = 1000)
@@ -266,6 +279,7 @@ public class LoadBalancedClusterMessageSenderTest {
     private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
+    private StreamObserver<GrpcCompensateCommand> responseObserver;
 
     private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
       this.connected = connected;
@@ -275,6 +289,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+      this.responseObserver = responseObserver;
       connected.add("Connected " + request.getServiceName());
     }
 
@@ -289,6 +304,13 @@ public class LoadBalancedClusterMessageSenderTest {
 
       sleep();
 
+      if ("TxAbortedEvent".equals(request.getType())) {
+        this.responseObserver.onNext(GrpcCompensateCommand
+            .newBuilder()
+            .setGlobalTxId(request.getGlobalTxId())
+            .build());
+      }
+
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.