You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:48:06 UTC

[incubator-servicecomb-saga] 01/06: SCB-164 reconnected to alpha on connection loss

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 5948b012363c2bbee300da53a3d0591dde9d5205
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:13:48 2018 +0800

    SCB-164 reconnected to alpha on connection loss
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../connector/grpc/GrpcClientMessageSender.java    |   5 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   5 +-
 .../grpc/LoadBalancedClusterMessageSender.java     |  57 +++++++---
 .../connector/grpc/PushBackReconnectRunnable.java  |  56 ++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 116 ++++++++++++---------
 .../grpc/PushBackReconnectRunnableTest.java        |  75 +++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       |   4 +-
 .../saga/omega/transaction/MessageSender.java      |   3 +
 8 files changed, 255 insertions(+), 66 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 4729db1..5534db9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,6 +20,8 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
+import java.util.function.Function;
+
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
@@ -54,13 +56,14 @@ public class GrpcClientMessageSender implements MessageSender {
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
+      Function<MessageSender, Runnable> errorHandlerFactory,
       MessageHandler handler) {
     this.target = address;
     this.asyncEventService = TxEventServiceGrpc.newStub(channel);
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, deserializer);
+    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 7a845eb..3cf46f8 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -35,10 +35,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MessageHandler messageHandler;
+  private final Runnable errorHandler;
   private final MessageDeserializer deserializer;
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler, MessageDeserializer deserializer) {
+  GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) {
     this.messageHandler = messageHandler;
+    this.errorHandler = errorHandler;
     this.deserializer = deserializer;
   }
 
@@ -58,6 +60,7 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
   @Override
   public void onError(Throwable t) {
     LOG.error("failed to process grpc compensate command.", t);
+    errorHandler.run();
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 14ce340..3cb6677 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -26,7 +27,11 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.function.Consumer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -45,11 +50,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   private final Map<MessageSender, Long> senders = new HashMap<>();
   private final Collection<ManagedChannel> channels;
 
+  private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
   public LoadBalancedClusterMessageSender(String[] addresses,
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
-      MessageHandler handler) {
+      MessageHandler handler,
+      int reconnectDelay) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address provided");
@@ -69,11 +78,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
               serializer,
               deserializer,
               serviceConfig,
+              errorHandlerFactory(),
               handler),
           0L);
     }
+
+    scheduleReconnectTask(reconnectDelay);
   }
 
+  // this is for test only
   LoadBalancedClusterMessageSender(MessageSender... messageSenders) {
     for (MessageSender sender : messageSenders) {
       senders.put(sender, 0L);
@@ -106,6 +119,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public void close() {
+    scheduler.shutdown();
     channels.forEach(ManagedChannel::shutdownNow);
   }
 
@@ -113,28 +127,45 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   public void send(TxEvent event) {
     boolean success = false;
     do {
-      try {
-        withFastestSender(messageSender -> {
-          // very large latency on exception
-          senders.put(messageSender, Long.MAX_VALUE);
+      MessageSender messageSender = fastestSender();
 
-          long startTime = System.nanoTime();
-          messageSender.send(event);
-          senders.put(messageSender, System.nanoTime() - startTime);
-        });
+      try {
+        long startTime = System.nanoTime();
+        messageSender.send(event);
+        senders.put(messageSender, System.nanoTime() - startTime);
 
         success = true;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
+
+        // very large latency on exception
+        senders.put(messageSender, Long.MAX_VALUE);
       }
     } while (!success && !Thread.currentThread().isInterrupted());
   }
 
-  private void withFastestSender(Consumer<MessageSender> consumer) {
-    senders.entrySet()
+  private MessageSender fastestSender() {
+    return senders.entrySet()
         .stream()
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .ifPresent(consumer);
+        .orElse(NO_OP_SENDER);
+  }
+
+  private void scheduleReconnectTask(int reconnectDelay) {
+    scheduler.scheduleWithFixedDelay(() -> {
+      try {
+        pendingTasks.take().run();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }, 0, reconnectDelay, MILLISECONDS);
+  }
+
+  private Function<MessageSender, Runnable> errorHandlerFactory() {
+    return messageSender -> {
+      Runnable runnable = new PushBackReconnectRunnable(messageSender, senders, pendingTasks);
+      return () -> pendingTasks.offer(runnable);
+    };
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
new file mode 100644
index 0000000..d21e568
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PushBackReconnectRunnable implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final MessageSender messageSender;
+  private final Map<MessageSender, Long> senders;
+  private final BlockingQueue<Runnable> pendingTasks;
+
+  PushBackReconnectRunnable(
+      MessageSender messageSender,
+      Map<MessageSender, Long> senders,
+      BlockingQueue<Runnable> pendingTasks) {
+    this.messageSender = messageSender;
+    this.senders = senders;
+    this.pendingTasks = pendingTasks;
+  }
+
+  @Override
+  public void run() {
+    try {
+      log.info("Retry connecting to alpha at {}", messageSender.target());
+      messageSender.onDisconnected();
+      messageSender.onConnected();
+      senders.put(messageSender, 0L);
+      log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+    } catch (Exception e) {
+      log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+      pendingTasks.offer(this);
+    }
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1868f31..42b3162 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,15 +29,11 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -53,37 +49,31 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
 import org.junit.Test;
-import org.junit.runners.MethodSorters;
 import org.mockito.Mockito;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class LoadBalancedClusterMessageSenderTest {
 
   private static final int[] ports = {8080, 8090};
-  private static final List<Server> servers = new ArrayList<>();
-
-  private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>();
-  private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>();
+  private static final Map<Integer, Server> servers = new HashMap<>();
 
   private static final Map<Integer, Integer> delays = new HashMap<Integer, Integer>() {{
     put(8080, 0);
     put(8090, 500);
   }};
 
-  private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{
-    put(8080, new ConcurrentSkipListSet<>());
-    put(8090, new ConcurrentSkipListSet<>());
+  private static final Map<Integer, Queue<String>> connected = new HashMap<Integer, Queue<String>>() {{
+    put(8080, new ConcurrentLinkedQueue<>());
+    put(8090, new ConcurrentLinkedQueue<>());
   }};
 
   private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{
-    put(8080, eventsOn8080);
-    put(8090, eventsOn8090);
+    put(8080, new ConcurrentLinkedQueue<>());
+    put(8090, new ConcurrentLinkedQueue<>());
   }};
 
   private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
@@ -108,49 +98,74 @@ public class LoadBalancedClusterMessageSenderTest {
         serializer,
         deserializer,
         new ServiceConfig(serviceName),
-        handler);
+        handler,
+        100);
   }
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    Arrays.stream(ports).forEach(port -> {
-      ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
-      serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
-      Server server = serverBuilder.build();
+    Arrays.stream(ports).forEach(LoadBalancedClusterMessageSenderTest::startServerOnPort);
+  }
 
-      try {
-        server.start();
-        servers.add(server);
-      } catch (IOException e) {
-        fail(e.getMessage());
-      }
-    });
+  private static void startServerOnPort(int port) {
+    ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
+    serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
+    Server server = serverBuilder.build();
+
+    try {
+      server.start();
+      servers.put(port, server);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    servers.forEach(Server::shutdown);
+    servers.values().forEach(Server::shutdown);
   }
 
   @After
   public void after() throws Exception {
-    eventsOn8080.clear();
-    eventsOn8090.clear();
+    eventsMap.values().forEach(Queue::clear);
+    connected.values().forEach(Queue::clear);
   }
 
   @Test
   public void resendToAnotherServerOnFailure() throws Exception {
     messageSender.send(event);
 
-    killServerReceivedMessage();
+    int deadPort = killServerReceivedMessage();
 
     messageSender.send(event);
+    messageSender.send(event);
+
+    assertThat(eventsMap.get(deadPort).size(), is(1));
+    assertThat(eventsMap.get(deadPort).peek().toString(), is(event.toString()));
+
+    int livePort = deadPort == 8080? 8090 : 8080;
+    assertThat(eventsMap.get(livePort).size(), is(2));
+    assertThat(eventsMap.get(livePort).peek().toString(), is(event.toString()));
 
-    assertThat(eventsOn8080.size(), is(1));
-    assertThat(eventsOn8080.peek().toString(), is(event.toString()));
+    // restart killed server in order not to affect other tests
+    startServerOnPort(deadPort);
+  }
+
+  @Test
+  public void resetLatencyOnReconnection() throws Exception {
+    messageSender.onConnected();
+    messageSender.send(event);
+
+    int deadPort = killServerReceivedMessage();
+
+    startServerOnPort(deadPort);
+    await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
+
+    messageSender.send(event);
+    messageSender.send(event);
 
-    assertThat(eventsOn8090.size(), is(1));
-    assertThat(eventsOn8090.peek().toString(), is(event.toString()));
+    // restarted server gets priority, since it had no traffic
+    assertThat(eventsMap.get(deadPort).size(), is(2));
   }
 
   @Test (timeout = 1000)
@@ -174,12 +189,12 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.onConnected();
     await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty());
 
-    assertThat(connected.get(8080), contains(serviceName));
-    assertThat(connected.get(8090), contains(serviceName));
+    assertThat(connected.get(8080), contains("Connected " + serviceName));
+    assertThat(connected.get(8090), contains("Connected " + serviceName));
 
     messageSender.onDisconnected();
-    assertThat(connected.get(8080).isEmpty(), is(true));
-    assertThat(connected.get(8090).isEmpty(), is(true));
+    assertThat(connected.get(8080), contains("Connected " + serviceName, "Disconnected " + serviceName));
+    assertThat(connected.get(8090), contains("Connected " + serviceName, "Disconnected " + serviceName));
   }
 
   @Test
@@ -222,8 +237,8 @@ public class LoadBalancedClusterMessageSenderTest {
     messageSender.send(event);
     messageSender.send(event);
 
-    assertThat(eventsOn8080.size(), is(3));
-    assertThat(eventsOn8090.size(), is(1));
+    assertThat(eventsMap.get(8080).size(), is(3));
+    assertThat(eventsMap.get(8090).size(), is(1));
   }
 
   @Test
@@ -236,22 +251,23 @@ public class LoadBalancedClusterMessageSenderTest {
     }
   }
 
-  private void killServerReceivedMessage() {
-    int index = 0;
+  private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
-        servers.get(index).shutdownNow();
+        Server serverToKill = servers.get(port);
+        serverToKill.shutdownNow();
+        return port;
       }
-      index++;
     }
+    throw new IllegalStateException("None of the servers received any message");
   }
 
   private static class MyTxEventService extends TxEventServiceImplBase {
-    private final Set<String> connected;
+    private final Queue<String> connected;
     private final Queue<TxEvent> events;
     private final int delay;
 
-    private MyTxEventService(Set<String> connected, Queue<TxEvent> events, int delay) {
+    private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
       this.connected = connected;
       this.events = events;
       this.delay = delay;
@@ -259,7 +275,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
-      connected.add(request.getServiceName());
+      connected.add("Connected " + request.getServiceName());
     }
 
     @Override
@@ -287,7 +303,7 @@ public class LoadBalancedClusterMessageSenderTest {
 
     @Override
     public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
-      connected.remove(request.getServiceName());
+      connected.add("Disconnected " + request.getServiceName());
       responseObserver.onNext(GrpcAck.newBuilder().build());
       responseObserver.onCompleted();
     }
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
new file mode 100644
index 0000000..a5b51b1
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PushBackReconnectRunnableTest {
+  private static final Runnable NO_OP_RUNNABLE = () -> {
+  };
+
+  private final MessageSender sender = Mockito.mock(MessageSender.class);
+  private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+  private final Map<MessageSender, Long> senders = new HashMap<>();
+  private final PushBackReconnectRunnable pushBack = new PushBackReconnectRunnable(sender, senders, runnables);
+
+  @Before
+  public void setUp() throws Exception {
+    runnables.offer(NO_OP_RUNNABLE);
+    senders.put(sender, Long.MAX_VALUE);
+  }
+
+  @Test
+  public void pushFailedCallbackToEndOfQueue() throws Exception {
+    doThrow(RuntimeException.class).doThrow(RuntimeException.class).doNothing().when(sender).onDisconnected();
+    assertThat(runnables, contains(NO_OP_RUNNABLE));
+
+    pushBack.run();
+
+    assertThat(runnables, contains(NO_OP_RUNNABLE, pushBack));
+    assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
+    assertThat(runnables, contains(pushBack));
+
+    // failed again and pushed back itself to queue
+    runnables.poll().run();
+    assertThat(runnables, contains(pushBack));
+
+    runnables.poll().run();
+
+    assertThat(runnables.isEmpty(), is(true));
+    assertThat(senders.get(sender), is(0L));
+
+    verify(sender, times(3)).onDisconnected();
+    verify(sender, times(1)).onConnected();
+  }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index b5d118a..e5806ad 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -52,6 +52,7 @@ class OmegaSpringConfig {
   @Bean
   MessageSender grpcMessageSender(
       @Value("${alpha.cluster.address}") String[] addresses,
+      @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
       ServiceConfig serviceConfig,
       @Lazy MessageHandler handler) {
 
@@ -61,7 +62,8 @@ class OmegaSpringConfig {
         messageFormat,
         messageFormat,
         serviceConfig,
-        handler);
+        handler,
+        reconnectDelay);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 6a4ede6..2f05394 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,6 +18,9 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSender {
+  MessageSender NO_OP_SENDER = event -> {
+  };
+
   default void onConnected() {
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.