You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/15 06:37:36 UTC

[GitHub] seanyinx closed pull request #113: SCB-211 wait until connection recover instead of try sending

seanyinx closed pull request #113: SCB-211 wait until connection recover instead of try sending
URL: https://github.com/apache/incubator-servicecomb-saga/pull/113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 6eae8a34..73940ed7 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -24,10 +24,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -47,10 +47,11 @@
 
 public class LoadBalancedClusterMessageSender implements MessageSender {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final Map<MessageSender, Long> senders = new HashMap<>();
+  private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -146,9 +147,16 @@ public void send(TxEvent event) {
   private MessageSender fastestSender() {
     return senders.entrySet()
         .stream()
+        .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse(NO_OP_SENDER);
+        .orElse((event -> {
+          try {
+            availableMessageSenders.take().send(event);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }));
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
@@ -163,7 +171,7 @@ private void scheduleReconnectTask(int reconnectDelay) {
 
   private Function<MessageSender, Runnable> errorHandlerFactory() {
     return messageSender -> {
-      Runnable runnable = new PushBackReconnectRunnable(messageSender, senders, pendingTasks);
+      Runnable runnable = new PushBackReconnectRunnable(messageSender, senders, pendingTasks, availableMessageSenders);
       return () -> pendingTasks.offer(runnable);
     };
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index d21e5682..f019d107 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -31,13 +31,17 @@
   private final Map<MessageSender, Long> senders;
   private final BlockingQueue<Runnable> pendingTasks;
 
+  private final BlockingQueue<MessageSender> connectedSenders;
+
   PushBackReconnectRunnable(
       MessageSender messageSender,
       Map<MessageSender, Long> senders,
-      BlockingQueue<Runnable> pendingTasks) {
+      BlockingQueue<Runnable> pendingTasks,
+      BlockingQueue<MessageSender> connectedSenders) {
     this.messageSender = messageSender;
     this.senders = senders;
     this.pendingTasks = pendingTasks;
+    this.connectedSenders = connectedSenders;
   }
 
   @Override
@@ -47,6 +51,7 @@ public void run() {
       messageSender.onDisconnected();
       messageSender.onConnected();
       senders.put(messageSender, 0L);
+      connectedSenders.offer(messageSender);
       log.info("Retry connecting to alpha at {} is successful", messageSender.target());
     } catch (Exception e) {
       log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8fe5247f..d5f432fb 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -19,6 +19,7 @@
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.lang.Thread.State.WAITING;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
@@ -26,6 +27,7 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
@@ -84,9 +86,9 @@
   private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
 
   private final List<String> compensated = new ArrayList<>();
-  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
-    compensated.add(globalTxId);
-  };
+
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) ->
+      compensated.add(globalTxId);
 
   private final String globalTxId = uniquify("globalTxId");
   private final String localTxId = uniquify("localTxId");
@@ -170,7 +172,7 @@ public void resetLatencyOnReconnection() throws Exception {
     messageSender.send(event);
 
     startServerOnPort(deadPort);
-    await().atMost(1, SECONDS).until(() -> connected.get(deadPort).size() == 3);
+    await().atMost(2, SECONDS).until(() -> connected.get(deadPort).size() == 3);
 
     TxEvent abortedEvent = new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, new RuntimeException("oops"));
     messageSender.send(abortedEvent);
@@ -195,6 +197,9 @@ public void stopSendingOnInterruption() throws Exception {
 
     Thread.sleep(300);
 
+    // stop trying to send message out on exception
+    verify(underlying, times(1)).send(event);
+
     thread.interrupt();
     thread.join();
   }
@@ -266,6 +271,27 @@ public void blowsUpWhenNoServerAddressProvided() throws Exception {
     }
   }
 
+  @Test
+  public void stopSendingWhenClusterIsDown() throws Exception {
+    servers.values().forEach(Server::shutdownNow);
+    messageSender.onConnected();
+
+    Thread thread = new Thread(() -> messageSender.send(event));
+    thread.start();
+
+    // we don't want to keep sending on cluster down
+    await().atMost(2, SECONDS).until(() -> thread.isAlive() && thread.getState().equals(WAITING));
+
+    assertThat(eventsMap.get(8080).isEmpty(), is(true));
+    assertThat(eventsMap.get(8090).isEmpty(), is(true));
+
+    startServerOnPort(8080);
+    startServerOnPort(8090);
+
+    await().atMost(2, SECONDS).until(() -> connected.get(8080).size() == 2 || connected.get(8090).size() == 2);
+    await().atMost(2, SECONDS).until(() -> eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1);
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
index a5b51b1c..e6589466 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnableTest.java
@@ -19,8 +19,9 @@
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -32,16 +33,17 @@
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class PushBackReconnectRunnableTest {
   private static final Runnable NO_OP_RUNNABLE = () -> {
   };
 
-  private final MessageSender sender = Mockito.mock(MessageSender.class);
+  private final MessageSender sender = mock(MessageSender.class);
   private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> connectedSenders = new LinkedBlockingQueue<>();
   private final Map<MessageSender, Long> senders = new HashMap<>();
-  private final PushBackReconnectRunnable pushBack = new PushBackReconnectRunnable(sender, senders, runnables);
+
+  private final PushBackReconnectRunnable pushBack = new PushBackReconnectRunnable(sender, senders, runnables, connectedSenders);
 
   @Before
   public void setUp() throws Exception {
@@ -68,6 +70,7 @@ public void pushFailedCallbackToEndOfQueue() throws Exception {
 
     assertThat(runnables.isEmpty(), is(true));
     assertThat(senders.get(sender), is(0L));
+    assertThat(connectedSenders, contains(sender));
 
     verify(sender, times(3)).onDisconnected();
     verify(sender, times(1)).onConnected();
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
index e2a1c623..23703256 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java
@@ -29,8 +29,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -56,6 +57,8 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
 
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
@@ -122,7 +125,7 @@ public void sendsAbortEventOnTimeout() throws Throwable {
       return null;
     });
 
-    CompletableFuture.runAsync(() -> {
+    executor.execute(() -> {
       try {
         aspect.advise(joinPoint, sagaStart);
       } catch (Throwable throwable) {
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index dabfbc72..2d92d0f0 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -28,8 +28,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -58,6 +59,8 @@
   private final OmegaContext omegaContext = new OmegaContext(idGenerator);
   private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
 
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(newLocalTxId);
@@ -127,7 +130,7 @@ public void sendsAbortEventOnTimeout() throws Throwable {
       return null;
     });
 
-    CompletableFuture.runAsync(() -> {
+    executor.execute(() -> {
       try {
         // need to setup the thread local for it
         omegaContext.setGlobalTxId(globalTxId);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services