You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/11 02:48:06 UTC
[incubator-servicecomb-saga] 01/06: SCB-164 reconnected to alpha on
connection loss
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 5948b012363c2bbee300da53a3d0591dde9d5205
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 9 16:13:48 2018 +0800
SCB-164 reconnected to alpha on connection loss
Signed-off-by: seanyinx <se...@huawei.com>
---
.../connector/grpc/GrpcClientMessageSender.java | 5 +-
.../grpc/GrpcCompensateStreamObserver.java | 5 +-
.../grpc/LoadBalancedClusterMessageSender.java | 57 +++++++---
.../connector/grpc/PushBackReconnectRunnable.java | 56 ++++++++++
.../grpc/LoadBalancedClusterMessageSenderTest.java | 116 ++++++++++++---------
.../grpc/PushBackReconnectRunnableTest.java | 75 +++++++++++++
.../saga/omega/spring/OmegaSpringConfig.java | 4 +-
.../saga/omega/transaction/MessageSender.java | 3 +
8 files changed, 255 insertions(+), 66 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 4729db1..5534db9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,6 +20,8 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
+import java.util.function.Function;
+
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
@@ -54,13 +56,14 @@ public class GrpcClientMessageSender implements MessageSender {
MessageSerializer serializer,
MessageDeserializer deserializer,
ServiceConfig serviceConfig,
+ Function<MessageSender, Runnable> errorHandlerFactory,
MessageHandler handler) {
this.target = address;
this.asyncEventService = TxEventServiceGrpc.newStub(channel);
this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
this.serializer = serializer;
- this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, deserializer);
+ this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 7a845eb..3cf46f8 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -35,10 +35,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MessageHandler messageHandler;
+ private final Runnable errorHandler;
private final MessageDeserializer deserializer;
- GrpcCompensateStreamObserver(MessageHandler messageHandler, MessageDeserializer deserializer) {
+ GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) {
this.messageHandler = messageHandler;
+ this.errorHandler = errorHandler;
this.deserializer = deserializer;
}
@@ -58,6 +60,7 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
@Override
public void onError(Throwable t) {
LOG.error("failed to process grpc compensate command.", t);
+ errorHandler.run();
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 14ce340..3cb6677 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -26,7 +27,11 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.function.Consumer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -45,11 +50,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
private final Map<MessageSender, Long> senders = new HashMap<>();
private final Collection<ManagedChannel> channels;
+ private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
public LoadBalancedClusterMessageSender(String[] addresses,
MessageSerializer serializer,
MessageDeserializer deserializer,
ServiceConfig serviceConfig,
- MessageHandler handler) {
+ MessageHandler handler,
+ int reconnectDelay) {
if (addresses.length == 0) {
throw new IllegalArgumentException("No reachable cluster address provided");
@@ -69,11 +78,15 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
serializer,
deserializer,
serviceConfig,
+ errorHandlerFactory(),
handler),
0L);
}
+
+ scheduleReconnectTask(reconnectDelay);
}
+ // this is for test only
LoadBalancedClusterMessageSender(MessageSender... messageSenders) {
for (MessageSender sender : messageSenders) {
senders.put(sender, 0L);
@@ -106,6 +119,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
@Override
public void close() {
+ scheduler.shutdown();
channels.forEach(ManagedChannel::shutdownNow);
}
@@ -113,28 +127,45 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
public void send(TxEvent event) {
boolean success = false;
do {
- try {
- withFastestSender(messageSender -> {
- // very large latency on exception
- senders.put(messageSender, Long.MAX_VALUE);
+ MessageSender messageSender = fastestSender();
- long startTime = System.nanoTime();
- messageSender.send(event);
- senders.put(messageSender, System.nanoTime() - startTime);
- });
+ try {
+ long startTime = System.nanoTime();
+ messageSender.send(event);
+ senders.put(messageSender, System.nanoTime() - startTime);
success = true;
} catch (Exception e) {
log.error("Retry sending event {} due to failure", event, e);
+
+ // very large latency on exception
+ senders.put(messageSender, Long.MAX_VALUE);
}
} while (!success && !Thread.currentThread().isInterrupted());
}
- private void withFastestSender(Consumer<MessageSender> consumer) {
- senders.entrySet()
+ private MessageSender fastestSender() {
+ return senders.entrySet()
.stream()
.min(Comparator.comparingLong(Entry::getValue))
.map(Entry::getKey)
- .ifPresent(consumer);
+ .orElse(NO_OP_SENDER);
+ }
+
+ private void scheduleReconnectTask(int reconnectDelay) {
+ scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ pendingTasks.take().run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }, 0, reconnectDelay, MILLISECONDS);
+ }
+
+ private Function<MessageSender, Runnable> errorHandlerFactory() {
+ return messageSender -> {
+ Runnable runnable = new PushBackReconnectRunnable(messageSender, senders, pendingTasks);
+ return () -> pendingTasks.offer(runnable);
+ };
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
new file mode 100644
index 0000000..d21e568
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PushBackReconnectRunnable implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final MessageSender messageSender;
+ private final Map<MessageSender, Long> senders;
+ private final BlockingQueue<Runnable> pendingTasks;
+
+ PushBackReconnectRunnable(
+ MessageSender messageSender,
+ Map<MessageSender, Long> senders,
+ BlockingQueue<Runnable> pendingTasks) {
+ this.messageSender = messageSender;
+ this.senders = senders;
+ this.pendingTasks = pendingTasks;
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.info("Retry connecting to alpha at {}", messageSender.target());
+ messageSender.onDisconnected();
+ messageSender.onConnected();
+ senders.put(messageSender, 0L);
+ log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+ } catch (Exception e) {
+ log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+ pendingTasks.offer(this);
+ }
+ }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 1868f31..42b3162 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -29,15 +29,11 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -53,37 +49,31 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEvent
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
import org.junit.Test;
-import org.junit.runners.MethodSorters;
import org.mockito.Mockito;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class LoadBalancedClusterMessageSenderTest {
private static final int[] ports = {8080, 8090};
- private static final List<Server> servers = new ArrayList<>();
-
- private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>();
- private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>();
+ private static final Map<Integer, Server> servers = new HashMap<>();
private static final Map<Integer, Integer> delays = new HashMap<Integer, Integer>() {{
put(8080, 0);
put(8090, 500);
}};
- private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{
- put(8080, new ConcurrentSkipListSet<>());
- put(8090, new ConcurrentSkipListSet<>());
+ private static final Map<Integer, Queue<String>> connected = new HashMap<Integer, Queue<String>>() {{
+ put(8080, new ConcurrentLinkedQueue<>());
+ put(8090, new ConcurrentLinkedQueue<>());
}};
private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{
- put(8080, eventsOn8080);
- put(8090, eventsOn8090);
+ put(8080, new ConcurrentLinkedQueue<>());
+ put(8090, new ConcurrentLinkedQueue<>());
}};
private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
@@ -108,49 +98,74 @@ public class LoadBalancedClusterMessageSenderTest {
serializer,
deserializer,
new ServiceConfig(serviceName),
- handler);
+ handler,
+ 100);
}
@BeforeClass
public static void beforeClass() throws Exception {
- Arrays.stream(ports).forEach(port -> {
- ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
- serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
- Server server = serverBuilder.build();
+ Arrays.stream(ports).forEach(LoadBalancedClusterMessageSenderTest::startServerOnPort);
+ }
- try {
- server.start();
- servers.add(server);
- } catch (IOException e) {
- fail(e.getMessage());
- }
- });
+ private static void startServerOnPort(int port) {
+ ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
+ serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
+ Server server = serverBuilder.build();
+
+ try {
+ server.start();
+ servers.put(port, server);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
}
@AfterClass
public static void tearDown() throws Exception {
- servers.forEach(Server::shutdown);
+ servers.values().forEach(Server::shutdown);
}
@After
public void after() throws Exception {
- eventsOn8080.clear();
- eventsOn8090.clear();
+ eventsMap.values().forEach(Queue::clear);
+ connected.values().forEach(Queue::clear);
}
@Test
public void resendToAnotherServerOnFailure() throws Exception {
messageSender.send(event);
- killServerReceivedMessage();
+ int deadPort = killServerReceivedMessage();
messageSender.send(event);
+ messageSender.send(event);
+
+ assertThat(eventsMap.get(deadPort).size(), is(1));
+ assertThat(eventsMap.get(deadPort).peek().toString(), is(event.toString()));
+
+ int livePort = deadPort == 8080? 8090 : 8080;
+ assertThat(eventsMap.get(livePort).size(), is(2));
+ assertThat(eventsMap.get(livePort).peek().toString(), is(event.toString()));
- assertThat(eventsOn8080.size(), is(1));
- assertThat(eventsOn8080.peek().toString(), is(event.toString()));
+ // restart killed server in order not to affect other tests
+ startServerOnPort(deadPort);
+ }
+
+ @Test
+ public void resetLatencyOnReconnection() throws Exception {
+ messageSender.onConnected();
+ messageSender.send(event);
+
+ int deadPort = killServerReceivedMessage();
+
+ startServerOnPort(deadPort);
+ await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
+
+ messageSender.send(event);
+ messageSender.send(event);
- assertThat(eventsOn8090.size(), is(1));
- assertThat(eventsOn8090.peek().toString(), is(event.toString()));
+ // restarted server gets priority, since it had no traffic
+ assertThat(eventsMap.get(deadPort).size(), is(2));
}
@Test (timeout = 1000)
@@ -174,12 +189,12 @@ public class LoadBalancedClusterMessageSenderTest {
messageSender.onConnected();
await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty());
- assertThat(connected.get(8080), contains(serviceName));
- assertThat(connected.get(8090), contains(serviceName));
+ assertThat(connected.get(8080), contains("Connected " + serviceName));
+ assertThat(connected.get(8090), contains("Connected " + serviceName));
messageSender.onDisconnected();
- assertThat(connected.get(8080).isEmpty(), is(true));
- assertThat(connected.get(8090).isEmpty(), is(true));
+ assertThat(connected.get(8080), contains("Connected " + serviceName, "Disconnected " + serviceName));
+ assertThat(connected.get(8090), contains("Connected " + serviceName, "Disconnected " + serviceName));
}
@Test
@@ -222,8 +237,8 @@ public class LoadBalancedClusterMessageSenderTest {
messageSender.send(event);
messageSender.send(event);
- assertThat(eventsOn8080.size(), is(3));
- assertThat(eventsOn8090.size(), is(1));
+ assertThat(eventsMap.get(8080).size(), is(3));
+ assertThat(eventsMap.get(8090).size(), is(1));
}
@Test
@@ -236,22 +251,23 @@ public class LoadBalancedClusterMessageSenderTest {
}
}
- private void killServerReceivedMessage() {
- int index = 0;
+ private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
- servers.get(index).shutdownNow();
+ Server serverToKill = servers.get(port);
+ serverToKill.shutdownNow();
+ return port;
}
- index++;
}
+ throw new IllegalStateException("None of the servers received any message");
}
private static class MyTxEventService extends TxEventServiceImplBase {
- private final Set<String> connected;
+ private final Queue<String> connected;
private final Queue<TxEvent> events;
private final int delay;
- private MyTxEventService(Set<String> connected, Queue<TxEvent> events, int delay) {
+ private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
this.connected = connected;
this.events = events;
this.delay = delay;
@@ -259,7 +275,7 @@ public class LoadBalancedClusterMessageSenderTest {
@Override
public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
- connected.add(request.getServiceName());
+ connected.add("Connected " + request.getServiceName());
}
@Override
@@ -287,7 +303,7 @@ public class LoadBalancedClusterMessageSenderTest {
@Override
public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
- connected.remove(request.getServiceName());
+ connected.add("Disconnected " + request.getServiceName());
responseObserver.onNext(GrpcAck.newBuilder().build());
responseObserver.onCompleted();
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
new file mode 100644
index 0000000..a5b51b1
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PushBackReconnectRunnableTest {
+ private static final Runnable NO_OP_RUNNABLE = () -> {
+ };
+
+ private final MessageSender sender = Mockito.mock(MessageSender.class);
+ private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+ private final Map<MessageSender, Long> senders = new HashMap<>();
+ private final PushBackReconnectRunnable pushBack = new PushBackReconnectRunnable(sender, senders, runnables);
+
+ @Before
+ public void setUp() throws Exception {
+ runnables.offer(NO_OP_RUNNABLE);
+ senders.put(sender, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void pushFailedCallbackToEndOfQueue() throws Exception {
+ doThrow(RuntimeException.class).doThrow(RuntimeException.class).doNothing().when(sender).onDisconnected();
+ assertThat(runnables, contains(NO_OP_RUNNABLE));
+
+ pushBack.run();
+
+ assertThat(runnables, contains(NO_OP_RUNNABLE, pushBack));
+ assertThat(runnables.poll(), is(NO_OP_RUNNABLE));
+ assertThat(runnables, contains(pushBack));
+
+ // failed again and pushed back itself to queue
+ runnables.poll().run();
+ assertThat(runnables, contains(pushBack));
+
+ runnables.poll().run();
+
+ assertThat(runnables.isEmpty(), is(true));
+ assertThat(senders.get(sender), is(0L));
+
+ verify(sender, times(3)).onDisconnected();
+ verify(sender, times(1)).onConnected();
+ }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index b5d118a..e5806ad 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -52,6 +52,7 @@ class OmegaSpringConfig {
@Bean
MessageSender grpcMessageSender(
@Value("${alpha.cluster.address}") String[] addresses,
+ @Value("${omega.connection.reconnectDelay:3000}") int reconnectDelay,
ServiceConfig serviceConfig,
@Lazy MessageHandler handler) {
@@ -61,7 +62,8 @@ class OmegaSpringConfig {
messageFormat,
messageFormat,
serviceConfig,
- handler);
+ handler,
+ reconnectDelay);
sender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 6a4ede6..2f05394 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -18,6 +18,9 @@
package org.apache.servicecomb.saga.omega.transaction;
public interface MessageSender {
+ MessageSender NO_OP_SENDER = event -> {
+ };
+
default void onConnected() {
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.