You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 00:56:01 UTC

[incubator-servicecomb-saga] 04/06: SCB-227 throw OmegaException when sending get interrupted

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit f9acce026b7ec1c9b012f1d1c8ffdc3624ca6183
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Jan 15 17:58:06 2018 +0800

    SCB-227 throw OmegaException when sending get interrupted
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java    |  7 +------
 .../grpc/LoadBalancedClusterMessageSender.java        | 19 +++++++++----------
 .../grpc/LoadBalancedClusterMessageSenderTest.java    | 15 +++++++++++++++
 .../saga/omega/transaction/OmegaException.java        |  3 +++
 .../saga/omega/transaction/TransactionAspectTest.java |  4 +---
 5 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 65d3e90..eced7f9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -87,12 +87,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getPayloads().toByteArray()
     ));
 
-    if (ok) {
-      responseObserver.onNext(ALLOW);
-    } else {
-      responseObserver.onNext(REJECT);
-    }
-
+    responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9c7ab6b..b5e461b 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -39,6 +39,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,26 +127,24 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public AlphaResponse send(TxEvent event) {
-    AlphaResponse response = null;
-    boolean success = false;
     do {
       MessageSender messageSender = fastestSender();
 
       try {
         long startTime = System.nanoTime();
-        response = messageSender.send(event);
+        AlphaResponse response = messageSender.send(event);
         senders.put(messageSender, System.nanoTime() - startTime);
 
-        success = true;
+        return response;
       } catch (Exception e) {
         log.error("Retry sending event {} due to failure", event, e);
 
         // very large latency on exception
         senders.put(messageSender, Long.MAX_VALUE);
       }
-    } while (!success && !Thread.currentThread().isInterrupted());
+    } while (!Thread.currentThread().isInterrupted());
 
-    return response;
+    throw new OmegaException("Failed to send event " + event + " due to interruption");
   }
 
   private MessageSender fastestSender() {
@@ -154,14 +153,14 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse((event -> {
+        .orElse(event -> {
           try {
             return availableMessageSenders.take().send(event);
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ignored) {
             Thread.currentThread().interrupt();
           }
-          return new AlphaResponse(true);
-        }));
+          throw new OmegaException("Failed to send event " + event + " due to interruption");
+        });
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index c9549cf..b62dbfc 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -45,6 +45,7 @@ import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
@@ -302,6 +303,20 @@ public class LoadBalancedClusterMessageSenderTest {
     assertThat(messageSender.send(rejectEvent).aborted(), is(true));
   }
 
+  @Test
+  public void blowsUpWhenServerIsInterrupted() {
+    Thread thread = new Thread(() -> {
+      try {
+        messageSender.send(event);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        assertThat(e.getMessage().endsWith("interruption"), is(true));
+      }
+    });
+    thread.start();
+    thread.interrupt();
+  }
+
   private int killServerReceivedMessage() {
     for (int port : eventsMap.keySet()) {
       if (!eventsMap.get(port).isEmpty()) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
index 89eb67f..4488e44 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaException.java
@@ -18,6 +18,9 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public class OmegaException extends RuntimeException {
+  public OmegaException(String message) {
+    super(message);
+  }
 
   public OmegaException(String cause, Throwable throwable) {
     super(cause, throwable);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index a1bab98..aa79da7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -167,7 +167,7 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void returnImmediatelyWhenReceivedRejectResponse() {
+  public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
     MessageSender sender = mock(MessageSender.class);
     when(sender.send(any())).thenReturn(new AlphaResponse(true));
 
@@ -177,8 +177,6 @@ public class TransactionAspectTest {
       expectFailing(OmegaTxAbortedException.class);
     } catch (OmegaTxAbortedException e) {
       assertThat(e.getMessage().contains("Abort local sub transaction"), is(true));
-    } catch (Throwable throwable) {
-      expectFailing(OmegaTxAbortedException.class);
     }
 
     verify(sender, times(1)).send(any());

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.