You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 00:56:01 UTC
[incubator-servicecomb-saga] 04/06: SCB-227 throw OmegaException
when sending get interrupted
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit f9acce026b7ec1c9b012f1d1c8ffdc3624ca6183
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 17:58:06 2018 +0800
SCB-227 throw OmegaException when sending get interrupted
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 7 +------
.../grpc/LoadBalancedClusterMessageSender.java | 19 +++++++++----------
.../grpc/LoadBalancedClusterMessageSenderTest.java | 15 +++++++++++++++
.../saga/omega/transaction/OmegaException.java | 3 +++
.../saga/omega/transaction/TransactionAspectTest.java | 4 +---
5 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 65d3e90..eced7f9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -87,12 +87,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
message.getPayloads().toByteArray()
));
- if (ok) {
- responseObserver.onNext(ALLOW);
- } else {
- responseObserver.onNext(REJECT);
- }
-
+ responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9c7ab6b..b5e461b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -39,6 +39,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,26 +127,24 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
@Override
public AlphaResponse send(TxEvent event) {
- AlphaResponse response = null;
- boolean success = false;
do {
MessageSender messageSender = fastestSender();
try {
long startTime = System.nanoTime();
- response = messageSender.send(event);
+ AlphaResponse response = messageSender.send(event);
senders.put(messageSender, System.nanoTime() - startTime);
- success = true;
+ return response;
} catch (Exception e) {
log.error("Retry sending event {} due to failure", event, e);
// very large latency on exception
senders.put(messageSender, Long.MAX_VALUE);
}
- } while (!success && !Thread.currentThread().isInterrupted());
+ } while (!Thread.currentThread().isInterrupted());
- return response;
+ throw new OmegaException("Failed to send event " + event + " due to interruption");
}
private MessageSender fastestSender() {
@@ -154,14 +153,14 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
.filter(entry -> entry.getValue() < Long.MAX_VALUE)
.min(Comparator.comparingLong(Entry::getValue))
.map(Entry::getKey)
- .orElse((event -> {
+ .orElse(event -> {
try {
return availableMessageSenders.take().send(event);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
- return new AlphaResponse(true);
- }));
+ throw new OmegaException("Failed to send event " + event + " due to interruption");
+ });
}
private void scheduleReconnectTask(int reconnectDelay) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index c9549cf..b62dbfc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -45,6 +45,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
@@ -302,6 +303,20 @@ public class LoadBalancedClusterMessageSenderTest {
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
+ @Test
+ public void blowsUpWhenServerIsInterrupted() {
+ Thread thread = new Thread(() -> {
+ try {
+ messageSender.send(event);
+ expectFailing(OmegaException.class);
+ } catch (OmegaException e) {
+ assertThat(e.getMessage().endsWith("interruption"), is(true));
+ }
+ });
+ thread.start();
+ thread.interrupt();
+ }
+
private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
index 89eb67f..4488e44 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
@@ -18,6 +18,9 @@
package org.apache.servicecomb.saga.omega.transaction;
public class OmegaException extends RuntimeException {
+ public OmegaException(String message) {
+ super(message);
+ }
public OmegaException(String cause, Throwable throwable) {
super(cause, throwable);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index a1bab98..aa79da7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -167,7 +167,7 @@ public class TransactionAspectTest {
}
@Test
- public void returnImmediatelyWhenReceivedRejectResponse() {
+ public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
MessageSender sender = mock(MessageSender.class);
when(sender.send(any())).thenReturn(new AlphaResponse(true));
@@ -177,8 +177,6 @@ public class TransactionAspectTest {
expectFailing(OmegaTxAbortedException.class);
} catch (OmegaTxAbortedException e) {
assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
- } catch (Throwable throwable) {
- expectFailing(OmegaTxAbortedException.class);
}
verify(sender, times(1)).send(any());
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.