You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/10 03:31:27 UTC

[incubator-servicecomb-saga] 03/07: SCB-168 exception free connection & disconnection

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 2c395dab2f19125738d5132ac51ba27b45559ae8
Author: seanyinx <se...@huawei.com>
AuthorDate: Mon Jan 8 16:22:21 2018 +0800

    SCB-168 exception free connection & disconnection
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../connector/grpc/GrpcClientMessageSender.java    | 11 +++++++-
 .../grpc/LoadBalancedClusterMessageSender.java     | 21 ++++++++++++---
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 31 ++++++++++++++++++++++
 .../saga/omega/transaction/MessageSender.java      |  4 +++
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 59fbce1..4729db1 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -39,6 +39,7 @@ import io.grpc.ManagedChannel;
 
 public class GrpcClientMessageSender implements MessageSender {
 
+  private final String target;
   private final TxEventServiceStub asyncEventService;
 
   private final MessageSerializer serializer;
@@ -47,11 +48,14 @@ public class GrpcClientMessageSender implements MessageSender {
   private final GrpcCompensateStreamObserver compensateStreamObserver;
   private final GrpcServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel channel,
+  public GrpcClientMessageSender(
+      String address,
+      ManagedChannel channel,
       MessageSerializer serializer,
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler) {
+    this.target = address;
     this.asyncEventService = TxEventServiceGrpc.newStub(channel);
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
@@ -71,6 +75,11 @@ public class GrpcClientMessageSender implements MessageSender {
   }
 
   @Override
+  public String target() {
+    return target;
+  }
+
+  @Override
   public void send(TxEvent event) {
     blockingEventService.onTxEvent(convertEvent(event));
   }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index e5c9d19..14ce340 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -63,7 +63,9 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
       channels.add(channel);
       senders.put(
-          new GrpcClientMessageSender(channel,
+          new GrpcClientMessageSender(
+              address,
+              channel,
               serializer,
               deserializer,
               serviceConfig,
@@ -81,12 +83,25 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public void onConnected() {
-    senders.keySet().forEach(MessageSender::onConnected);
+    senders.keySet().forEach(sender -> {
+      try {
+        sender.onConnected();
+      } catch (Exception e) {
+        log.error("Failed connecting to alpha at {}", sender.target(), e);
+      }
+    });
   }
 
   @Override
   public void onDisconnected() {
-    senders.keySet().forEach(MessageSender::onDisconnected);
+    senders.keySet().forEach(sender -> {
+      try {
+        sender.onDisconnected();
+      } catch (Exception e) {
+        log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+      }
+    });
+
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 6f2d90b..dc75663 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -184,6 +185,36 @@ public class LoadBalancedClusterMessageSenderTest {
   }
 
   @Test
+  public void swallowException_UntilAllSendersConnected() throws Exception {
+    MessageSender underlying1 = Mockito.mock(MessageSender.class);
+    doThrow(RuntimeException.class).when(underlying1).onConnected();
+
+    MessageSender underlying2 = Mockito.mock(MessageSender.class);
+
+    MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2);
+
+    sender.onConnected();
+
+    verify(underlying1).onConnected();
+    verify(underlying2).onConnected();
+  }
+
+  @Test
+  public void swallowException_UntilAllSendersDisconnected() throws Exception {
+    MessageSender underlying1 = Mockito.mock(MessageSender.class);
+    doThrow(RuntimeException.class).when(underlying1).onDisconnected();
+
+    MessageSender underlying2 = Mockito.mock(MessageSender.class);
+
+    MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2);
+
+    sender.onDisconnected();
+
+    verify(underlying1).onDisconnected();
+    verify(underlying2).onDisconnected();
+  }
+
+  @Test
   public void considerFasterServerFirst() throws Exception {
     // we don't know which server is selected at first
     messageSender.send(event);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 39b4d62..6a4ede6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -27,5 +27,9 @@ public interface MessageSender {
   default void close() {
   }
 
+  default String target() {
+    return "UNKNOWN";
+  }
+
   void send(TxEvent event);
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.