You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/10 03:31:27 UTC
[incubator-servicecomb-saga] 03/07: SCB-168 exception free
connection & disconnection
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 2c395dab2f19125738d5132ac51ba27b45559ae8
Author: seanyinx <se...@huawei.com>
AuthorDate: Mon Jan 8 16:22:21 2018 +0800
SCB-168 exception free connection & disconnection
Signed-off-by: seanyinx <se...@huawei.com>
---
.../connector/grpc/GrpcClientMessageSender.java | 11 +++++++-
.../grpc/LoadBalancedClusterMessageSender.java | 21 ++++++++++++---
.../grpc/LoadBalancedClusterMessageSenderTest.java | 31 ++++++++++++++++++++++
.../saga/omega/transaction/MessageSender.java | 4 +++
4 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 59fbce1..4729db1 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -39,6 +39,7 @@ import io.grpc.ManagedChannel;
public class GrpcClientMessageSender implements MessageSender {
+ private final String target;
private final TxEventServiceStub asyncEventService;
private final MessageSerializer serializer;
@@ -47,11 +48,14 @@ public class GrpcClientMessageSender implements MessageSender {
private final GrpcCompensateStreamObserver compensateStreamObserver;
private final GrpcServiceConfig serviceConfig;
- public GrpcClientMessageSender(ManagedChannel channel,
+ public GrpcClientMessageSender(
+ String address,
+ ManagedChannel channel,
MessageSerializer serializer,
MessageDeserializer deserializer,
ServiceConfig serviceConfig,
MessageHandler handler) {
+ this.target = address;
this.asyncEventService = TxEventServiceGrpc.newStub(channel);
this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
this.serializer = serializer;
@@ -71,6 +75,11 @@ public class GrpcClientMessageSender implements MessageSender {
}
@Override
+ public String target() {
+ return target;
+ }
+
+ @Override
public void send(TxEvent event) {
blockingEventService.onTxEvent(convertEvent(event));
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index e5c9d19..14ce340 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -63,7 +63,9 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
channels.add(channel);
senders.put(
- new GrpcClientMessageSender(channel,
+ new GrpcClientMessageSender(
+ address,
+ channel,
serializer,
deserializer,
serviceConfig,
@@ -81,12 +83,25 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
@Override
public void onConnected() {
- senders.keySet().forEach(MessageSender::onConnected);
+ senders.keySet().forEach(sender -> {
+ try {
+ sender.onConnected();
+ } catch (Exception e) {
+ log.error("Failed connecting to alpha at {}", sender.target(), e);
+ }
+ });
}
@Override
public void onDisconnected() {
- senders.keySet().forEach(MessageSender::onDisconnected);
+ senders.keySet().forEach(sender -> {
+ try {
+ sender.onDisconnected();
+ } catch (Exception e) {
+ log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+ }
+ });
+
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 6f2d90b..dc75663 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
@@ -184,6 +185,36 @@ public class LoadBalancedClusterMessageSenderTest {
}
@Test
+ public void swallowException_UntilAllSendersConnected() throws Exception {
+ MessageSender underlying1 = Mockito.mock(MessageSender.class);
+ doThrow(RuntimeException.class).when(underlying1).onConnected();
+
+ MessageSender underlying2 = Mockito.mock(MessageSender.class);
+
+ MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2);
+
+ sender.onConnected();
+
+ verify(underlying1).onConnected();
+ verify(underlying2).onConnected();
+ }
+
+ @Test
+ public void swallowException_UntilAllSendersDisconnected() throws Exception {
+ MessageSender underlying1 = Mockito.mock(MessageSender.class);
+ doThrow(RuntimeException.class).when(underlying1).onDisconnected();
+
+ MessageSender underlying2 = Mockito.mock(MessageSender.class);
+
+ MessageSender sender = new LoadBalancedClusterMessageSender(underlying1, underlying2);
+
+ sender.onDisconnected();
+
+ verify(underlying1).onDisconnected();
+ verify(underlying2).onDisconnected();
+ }
+
+ @Test
public void considerFasterServerFirst() throws Exception {
// we don't know which server is selected at first
messageSender.send(event);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 39b4d62..6a4ede6 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -27,5 +27,9 @@ public interface MessageSender {
default void close() {
}
+ default String target() {
+ return "UNKNOWN";
+ }
+
void send(TxEvent event);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.