You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 09:45:57 UTC

[incubator-servicecomb-saga] 02/02: SCB-234 rethrow exception in saga start annotation processor

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b54587d3b578e172a77727e2a041e0732364252b
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 16 16:51:48 2018 +0800

    SCB-234 rethrow exception in saga start annotation processor
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 13 +++--------
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  8 +------
 .../connector/grpc/RetryableMessageSenderTest.java |  8 +++----
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 +-------
 omega/omega-transaction/pom.xml                    |  4 ++++
 .../transaction/SagaStartAnnotationProcessor.java  |  8 ++++++-
 .../SagaStartAnnotationProcessorTest.java          | 27 +++++++++++++++++++---
 pom.xml                                            |  6 +++++
 8 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index b518524..9a78a62 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -53,8 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
-  private final BlockingQueue<MessageSender> availableMessageSenders;
-  private final MessageSender retryableMessageSender;
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -62,17 +62,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler,
-      int reconnectDelay,
-      BlockingQueue<MessageSender> availableMessageSenders,
-      MessageSender retryableMessageSender) {
+      int reconnectDelay) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address provided");
     }
 
-    this.availableMessageSenders = availableMessageSenders;
-    this.retryableMessageSender = retryableMessageSender;
-
     channels = new ArrayList<>(addresses.length);
     for (String address : addresses) {
       ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
@@ -101,8 +96,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       senders.put(sender, 0L);
     }
     channels = emptyList();
-    availableMessageSenders = new LinkedBlockingQueue<>();
-    retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 315c5ae..8062ae9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -37,9 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -103,8 +101,6 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
-  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
-  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   private final MessageSender messageSender = newMessageSender(addresses);
 
   private MessageSender newMessageSender(String[] addresses) {
@@ -114,9 +110,7 @@ public class LoadBalancedClusterMessageSenderTest {
         deserializer,
         new ServiceConfig(serviceName),
         handler,
-        100,
-        availableMessageSenders,
-        retryableMessageSender);
+        100);
   }
 
   @BeforeClass
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 7ffbf9a..562c50f 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -37,7 +37,7 @@ import org.junit.Test;
 
 public class RetryableMessageSenderTest {
   @SuppressWarnings("unchecked")
-  private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class);
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
   private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders);
 
   private final String globalTxId = uniquify("globalTxId");
@@ -45,9 +45,9 @@ public class RetryableMessageSenderTest {
   private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
 
   @Test
-  public void sendEventWhenSenderIsAvailable() throws InterruptedException {
+  public void sendEventWhenSenderIsAvailable() {
     MessageSender sender = mock(MessageSender.class);
-    when(availableMessageSenders.take()).thenReturn(sender);
+    availableMessageSenders.add(sender);
 
     messageSender.send(event);
 
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 78321a4..fa4027b 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,11 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.spring;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -67,17 +63,13 @@ class OmegaSpringConfig {
       @Lazy MessageHandler handler) {
 
     MessageFormat messageFormat = new KryoMessageFormat();
-    BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
-    MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
     MessageSender sender = new LoadBalancedClusterMessageSender(
         addresses,
         messageFormat,
         messageFormat,
         serviceConfig,
         handler,
-        reconnectDelay,
-        availableMessageSenders,
-        retryableMessageSender);
+        reconnectDelay);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index a2bf293..258770c 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -46,6 +46,10 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.transaction</groupId>
+      <artifactId>javax.transaction-api</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7299b25..7ef021a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import javax.transaction.TransactionalException;
+
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 class SagaStartAnnotationProcessor implements EventAwareInterceptor {
@@ -31,7 +33,11 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   @Override
   public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    try {
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    } catch (OmegaException e) {
+      throw new TransactionalException(e.getMessage(), e.getCause());
+    }
   }
 
   @Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index f8e936d..566a456 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import javax.transaction.TransactionalException;
+
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -43,12 +49,10 @@ public class SagaStartAnnotationProcessorTest {
 
   private final String globalTxId = UUID.randomUUID().toString();
 
-  private final String localTxId = UUID.randomUUID().toString();
-
   @SuppressWarnings("unchecked")
   private final IdGenerator<String> generator = mock(IdGenerator.class);
-
   private final OmegaContext context = new OmegaContext(generator);
+  private final OmegaException exception = new OmegaException("exception", new RuntimeException("runtime exception"));
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
       sender);
@@ -86,4 +90,21 @@ public class SagaStartAnnotationProcessorTest {
     assertThat(event.type(), is(EventType.SagaEndedEvent));
     assertThat(event.payloads().length, is(0));
   }
+
+  @Test
+  public void transformInterceptedException() {
+    MessageSender sender = mock(MessageSender.class);
+    SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+
+    doThrow(exception).when(sender).send(any());
+
+    try {
+      sagaStartAnnotationProcessor.preIntercept(null, null);
+      expectFailing(TransactionalException.class);
+    } catch (TransactionalException e) {
+      assertThat(e.getMessage(), is("exception"));
+      assertThat(e.getCause(), instanceOf(RuntimeException.class));
+      assertThat(e.getCause().getMessage(), is("runtime exception"));
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 1d41a45..9338b2f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
     <maven.failsafe.version>2.19.1</maven.failsafe.version>
     <grpc.version>1.8.0</grpc.version>
     <kryo.version>4.0.1</kryo.version>
+    <javax.transaction.version>1.2</javax.transaction.version>
   </properties>
 
   <name>ServiceComb Saga</name>
@@ -347,6 +348,11 @@
         <artifactId>kryo</artifactId>
         <version>${kryo.version}</version>
       </dependency>
+      <dependency>
+        <groupId>javax.transaction</groupId>
+        <artifactId>javax.transaction-api</artifactId>
+        <version>${javax.transaction.version}</version>
+      </dependency>
 
       <!-- test dependencies -->
       <dependency>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.