You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 09:45:57 UTC
[incubator-servicecomb-saga] 02/02: SCB-234 rethrow exception in
saga start annotation processor
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit b54587d3b578e172a77727e2a041e0732364252b
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 16 16:51:48 2018 +0800
SCB-234 rethrow exception in saga start annotation processor
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../grpc/LoadBalancedClusterMessageSender.java | 13 +++--------
.../grpc/LoadBalancedClusterMessageSenderTest.java | 8 +------
.../connector/grpc/RetryableMessageSenderTest.java | 8 +++----
.../saga/omega/spring/OmegaSpringConfig.java | 10 +-------
omega/omega-transaction/pom.xml | 4 ++++
.../transaction/SagaStartAnnotationProcessor.java | 8 ++++++-
.../SagaStartAnnotationProcessorTest.java | 27 +++++++++++++++++++---
pom.xml | 6 +++++
8 files changed, 50 insertions(+), 34 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index b518524..9a78a62 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -53,8 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
private final Collection<ManagedChannel> channels;
private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
- private final BlockingQueue<MessageSender> availableMessageSenders;
- private final MessageSender retryableMessageSender;
+ private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+ private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public LoadBalancedClusterMessageSender(String[] addresses,
@@ -62,17 +62,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
MessageDeserializer deserializer,
ServiceConfig serviceConfig,
MessageHandler handler,
- int reconnectDelay,
- BlockingQueue<MessageSender> availableMessageSenders,
- MessageSender retryableMessageSender) {
+ int reconnectDelay) {
if (addresses.length == 0) {
throw new IllegalArgumentException("No reachable cluster address provided");
}
- this.availableMessageSenders = availableMessageSenders;
- this.retryableMessageSender = retryableMessageSender;
-
channels = new ArrayList<>(addresses.length);
for (String address : addresses) {
ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
@@ -101,8 +96,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
senders.put(sender, 0L);
}
channels = emptyList();
- availableMessageSenders = new LinkedBlockingQueue<>();
- retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
}
@Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 315c5ae..8062ae9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -37,9 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -103,8 +101,6 @@ public class LoadBalancedClusterMessageSenderTest {
private final String serviceName = uniquify("serviceName");
private final String[] addresses = {"localhost:8080", "localhost:8090"};
- private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
- private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
private final MessageSender messageSender = newMessageSender(addresses);
private MessageSender newMessageSender(String[] addresses) {
@@ -114,9 +110,7 @@ public class LoadBalancedClusterMessageSenderTest {
deserializer,
new ServiceConfig(serviceName),
handler,
- 100,
- availableMessageSenders,
- retryableMessageSender);
+ 100);
}
@BeforeClass
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 7ffbf9a..562c50f 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -37,7 +37,7 @@ import org.junit.Test;
public class RetryableMessageSenderTest {
@SuppressWarnings("unchecked")
- private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class);
+ private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders);
private final String globalTxId = uniquify("globalTxId");
@@ -45,9 +45,9 @@ public class RetryableMessageSenderTest {
private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
@Test
- public void sendEventWhenSenderIsAvailable() throws InterruptedException {
+ public void sendEventWhenSenderIsAvailable() {
MessageSender sender = mock(MessageSender.class);
- when(availableMessageSenders.take()).thenReturn(sender);
+ availableMessageSenders.add(sender);
messageSender.send(event);
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 78321a4..fa4027b 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,11 +17,7 @@
package org.apache.servicecomb.saga.omega.spring;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
import org.apache.servicecomb.saga.omega.context.CompensationContext;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -67,17 +63,13 @@ class OmegaSpringConfig {
@Lazy MessageHandler handler) {
MessageFormat messageFormat = new KryoMessageFormat();
- BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
- MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
MessageSender sender = new LoadBalancedClusterMessageSender(
addresses,
messageFormat,
messageFormat,
serviceConfig,
handler,
- reconnectDelay,
- availableMessageSenders,
- retryableMessageSender);
+ reconnectDelay);
sender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index a2bf293..258770c 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -46,6 +46,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>javax.transaction</groupId>
+ <artifactId>javax.transaction-api</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7299b25..7ef021a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -17,6 +17,8 @@
package org.apache.servicecomb.saga.omega.transaction;
+import javax.transaction.TransactionalException;
+
import org.apache.servicecomb.saga.omega.context.OmegaContext;
class SagaStartAnnotationProcessor implements EventAwareInterceptor {
@@ -31,7 +33,11 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
- return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ try {
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ } catch (OmegaException e) {
+ throw new TransactionalException(e.getMessage(), e.getCause());
+ }
}
@Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index f8e936d..566a456 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -17,15 +17,21 @@
package org.apache.servicecomb.saga.omega.transaction;
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import javax.transaction.TransactionalException;
+
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -43,12 +49,10 @@ public class SagaStartAnnotationProcessorTest {
private final String globalTxId = UUID.randomUUID().toString();
- private final String localTxId = UUID.randomUUID().toString();
-
@SuppressWarnings("unchecked")
private final IdGenerator<String> generator = mock(IdGenerator.class);
-
private final OmegaContext context = new OmegaContext(generator);
+ private final OmegaException exception = new OmegaException("exception", new RuntimeException("runtime exception"));
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
sender);
@@ -86,4 +90,21 @@ public class SagaStartAnnotationProcessorTest {
assertThat(event.type(), is(EventType.SagaEndedEvent));
assertThat(event.payloads().length, is(0));
}
+
+ @Test
+ public void transformInterceptedException() {
+ MessageSender sender = mock(MessageSender.class);
+ SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+
+ doThrow(exception).when(sender).send(any());
+
+ try {
+ sagaStartAnnotationProcessor.preIntercept(null, null);
+ expectFailing(TransactionalException.class);
+ } catch (TransactionalException e) {
+ assertThat(e.getMessage(), is("exception"));
+ assertThat(e.getCause(), instanceOf(RuntimeException.class));
+ assertThat(e.getCause().getMessage(), is("runtime exception"));
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 1d41a45..9338b2f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
<maven.failsafe.version>2.19.1</maven.failsafe.version>
<grpc.version>1.8.0</grpc.version>
<kryo.version>4.0.1</kryo.version>
+ <javax.transaction.version>1.2</javax.transaction.version>
</properties>
<name>ServiceComb Saga</name>
@@ -347,6 +348,11 @@
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
+ <dependency>
+ <groupId>javax.transaction</groupId>
+ <artifactId>javax.transaction-api</artifactId>
+ <version>${javax.transaction.version}</version>
+ </dependency>
<!-- test dependencies -->
<dependency>
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.