You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/16 09:45:55 UTC

[incubator-servicecomb-saga] branch master updated (a063058 -> b54587d)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from a063058  SCB-220 fixed rebase conflict
     new 335bf7a  SCB-234 fail fast SagaStartedEvent when all alpha servers are down
     new b54587d  SCB-234 rethrow exception in saga start annotation processor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../grpc/LoadBalancedClusterMessageSender.java     |  9 +--
 .../connector/grpc/RetryableMessageSender.java     | 47 ++++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  2 +-
 .../connector/grpc/RetryableMessageSenderTest.java | 85 ++++++++++++++++++++++
 omega/omega-transaction/pom.xml                    |  4 +
 .../transaction/SagaStartAnnotationProcessor.java  |  8 +-
 .../SagaStartAnnotationProcessorTest.java          | 27 ++++++-
 pom.xml                                            |  6 ++
 8 files changed, 176 insertions(+), 12 deletions(-)
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 01/02: SCB-234 fail fast SagaStartedEvent when all alpha servers are down

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 335bf7a8b3fb1104fa0a5db79a474c7bc129f8ae
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 16 15:42:30 2018 +0800

    SCB-234 fail fast SagaStartedEvent when all alpha servers are down
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 20 ++---
 .../connector/grpc/RetryableMessageSender.java     | 47 ++++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 10 ++-
 .../connector/grpc/RetryableMessageSenderTest.java | 85 ++++++++++++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 ++-
 5 files changed, 160 insertions(+), 12 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 700864a..b518524 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -53,7 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
-  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+  private final BlockingQueue<MessageSender> availableMessageSenders;
+  private final MessageSender retryableMessageSender;
   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -61,12 +62,17 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler,
-      int reconnectDelay) {
+      int reconnectDelay,
+      BlockingQueue<MessageSender> availableMessageSenders,
+      MessageSender retryableMessageSender) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address provided");
     }
 
+    this.availableMessageSenders = availableMessageSenders;
+    this.retryableMessageSender = retryableMessageSender;
+
     channels = new ArrayList<>(addresses.length);
     for (String address : addresses) {
       ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
@@ -95,6 +101,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       senders.put(sender, 0L);
     }
     channels = emptyList();
+    availableMessageSenders = new LinkedBlockingQueue<>();
+    retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   }
 
   @Override
@@ -155,13 +163,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
         .filter(entry -> entry.getValue() < Long.MAX_VALUE)
         .min(Comparator.comparingLong(Entry::getValue))
         .map(Entry::getKey)
-        .orElse(event -> {
-          try {
-            return availableMessageSenders.take().send(event);
-          } catch (InterruptedException e) {
-            throw new OmegaException("Failed to send event " + event + " due to interruption", e);
-          }
-        });
+        .orElse(retryableMessageSender);
   }
 
   private void scheduleReconnectTask(int reconnectDelay) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
new file mode 100644
index 0000000..abce82b
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSender.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+
+public class RetryableMessageSender implements MessageSender {
+  private final BlockingQueue<MessageSender> availableMessageSenders;
+
+  public RetryableMessageSender(BlockingQueue<MessageSender> availableMessageSenders) {
+    this.availableMessageSenders = availableMessageSenders;
+  }
+
+  @Override
+  public AlphaResponse send(TxEvent event) {
+    if (event.type() == SagaStartedEvent) {
+      throw new OmegaException("Failed to process subsequent requests because no alpha server is available");
+    }
+    try {
+      return availableMessageSenders.take().send(event);
+    } catch (InterruptedException e) {
+      throw new OmegaException("Failed to send event " + event + " due to interruption", e);
+    }
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 93cb854..315c5ae 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -37,7 +37,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -101,6 +103,8 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   private final MessageSender messageSender = newMessageSender(addresses);
 
   private MessageSender newMessageSender(String[] addresses) {
@@ -110,7 +114,9 @@ public class LoadBalancedClusterMessageSenderTest {
         deserializer,
         new ServiceConfig(serviceName),
         handler,
-        100);
+        100,
+        availableMessageSenders,
+        retryableMessageSender);
   }
 
   @BeforeClass
@@ -156,7 +162,7 @@ public class LoadBalancedClusterMessageSenderTest {
     assertThat(eventsMap.get(deadPort).size(), is(1));
     assertThat(eventsMap.get(deadPort).peek().toString(), is(event.toString()));
 
-    int livePort = deadPort == 8080? 8090 : 8080;
+    int livePort = deadPort == 8080 ? 8090 : 8080;
     assertThat(eventsMap.get(livePort).size(), is(2));
     assertThat(eventsMap.get(livePort).peek().toString(), is(event.toString()));
 
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
new file mode 100644
index 0000000..7ffbf9a
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.SagaStartedEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
+import org.junit.Test;
+
+public class RetryableMessageSenderTest {
+  @SuppressWarnings("unchecked")
+  private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class);
+  private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders);
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
+
+  @Test
+  public void sendEventWhenSenderIsAvailable() throws InterruptedException {
+    MessageSender sender = mock(MessageSender.class);
+    when(availableMessageSenders.take()).thenReturn(sender);
+
+    messageSender.send(event);
+
+    verify(sender, times(1)).send(event);
+  }
+
+  @Test
+  public void blowsUpWhenEventIsSagaStarted() {
+    TxEvent event = new SagaStartedEvent(globalTxId, localTxId);
+
+    try {
+      messageSender.send(event);
+      expectFailing(OmegaException.class);
+    } catch (OmegaException e) {
+      assertThat(e.getMessage(),
+          is("Failed to process subsequent requests because no alpha server is available"));
+    }
+  }
+
+  @Test
+  public void blowsUpWhenInterrupted() throws InterruptedException {
+    Thread thread = new Thread(() -> {
+      try {
+        messageSender.send(event);
+        expectFailing(OmegaException.class);
+      } catch (OmegaException e) {
+        assertThat(e.getMessage().endsWith("interruption"), is(true));
+      }
+    });
+
+    thread.start();
+    thread.interrupt();
+    thread.join();
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index fa4027b..78321a4 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,7 +17,11 @@
 
 package org.apache.servicecomb.saga.omega.spring;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
+import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -63,13 +67,17 @@ class OmegaSpringConfig {
       @Lazy MessageHandler handler) {
 
     MessageFormat messageFormat = new KryoMessageFormat();
+    BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+    MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
     MessageSender sender = new LoadBalancedClusterMessageSender(
         addresses,
         messageFormat,
         messageFormat,
         serviceConfig,
         handler,
-        reconnectDelay);
+        reconnectDelay,
+        availableMessageSenders,
+        retryableMessageSender);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 02/02: SCB-234 rethrow exception in saga start annotation processor

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b54587d3b578e172a77727e2a041e0732364252b
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 16 16:51:48 2018 +0800

    SCB-234 rethrow exception in saga start annotation processor
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 13 +++--------
 .../grpc/LoadBalancedClusterMessageSenderTest.java |  8 +------
 .../connector/grpc/RetryableMessageSenderTest.java |  8 +++----
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 +-------
 omega/omega-transaction/pom.xml                    |  4 ++++
 .../transaction/SagaStartAnnotationProcessor.java  |  8 ++++++-
 .../SagaStartAnnotationProcessorTest.java          | 27 +++++++++++++++++++---
 pom.xml                                            |  6 +++++
 8 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index b518524..9a78a62 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -53,8 +53,8 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
-  private final BlockingQueue<MessageSender> availableMessageSenders;
-  private final MessageSender retryableMessageSender;
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
+  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(String[] addresses,
@@ -62,17 +62,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       MessageDeserializer deserializer,
       ServiceConfig serviceConfig,
       MessageHandler handler,
-      int reconnectDelay,
-      BlockingQueue<MessageSender> availableMessageSenders,
-      MessageSender retryableMessageSender) {
+      int reconnectDelay) {
 
     if (addresses.length == 0) {
       throw new IllegalArgumentException("No reachable cluster address provided");
     }
 
-    this.availableMessageSenders = availableMessageSenders;
-    this.retryableMessageSender = retryableMessageSender;
-
     channels = new ArrayList<>(addresses.length);
     for (String address : addresses) {
       ManagedChannel channel = ManagedChannelBuilder.forTarget(address)
@@ -101,8 +96,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       senders.put(sender, 0L);
     }
     channels = emptyList();
-    availableMessageSenders = new LinkedBlockingQueue<>();
-    retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   }
 
   @Override
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 315c5ae..8062ae9 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -37,9 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
@@ -103,8 +101,6 @@ public class LoadBalancedClusterMessageSenderTest {
 
   private final String serviceName = uniquify("serviceName");
   private final String[] addresses = {"localhost:8080", "localhost:8090"};
-  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
-  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
   private final MessageSender messageSender = newMessageSender(addresses);
 
   private MessageSender newMessageSender(String[] addresses) {
@@ -114,9 +110,7 @@ public class LoadBalancedClusterMessageSenderTest {
         deserializer,
         new ServiceConfig(serviceName),
         handler,
-        100,
-        availableMessageSenders,
-        retryableMessageSender);
+        100);
   }
 
   @BeforeClass
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 7ffbf9a..562c50f 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
@@ -37,7 +37,7 @@ import org.junit.Test;
 
 public class RetryableMessageSenderTest {
   @SuppressWarnings("unchecked")
-  private final BlockingQueue<MessageSender> availableMessageSenders = mock(BlockingQueue.class);
+  private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
   private final MessageSender messageSender = new RetryableMessageSender(availableMessageSenders);
 
   private final String globalTxId = uniquify("globalTxId");
@@ -45,9 +45,9 @@ public class RetryableMessageSenderTest {
   private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
 
   @Test
-  public void sendEventWhenSenderIsAvailable() throws InterruptedException {
+  public void sendEventWhenSenderIsAvailable() {
     MessageSender sender = mock(MessageSender.class);
-    when(availableMessageSenders.take()).thenReturn(sender);
+    availableMessageSenders.add(sender);
 
     messageSender.send(event);
 
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 78321a4..fa4027b 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,11 +17,7 @@
 
 package org.apache.servicecomb.saga.omega.spring;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
-import org.apache.servicecomb.saga.omega.connector.grpc.RetryableMessageSender;
 import org.apache.servicecomb.saga.omega.context.CompensationContext;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -67,17 +63,13 @@ class OmegaSpringConfig {
       @Lazy MessageHandler handler) {
 
     MessageFormat messageFormat = new KryoMessageFormat();
-    BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
-    MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
     MessageSender sender = new LoadBalancedClusterMessageSender(
         addresses,
         messageFormat,
         messageFormat,
         serviceConfig,
         handler,
-        reconnectDelay,
-        availableMessageSenders,
-        retryableMessageSender);
+        reconnectDelay);
 
     sender.onConnected();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index a2bf293..258770c 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -46,6 +46,10 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.transaction</groupId>
+      <artifactId>javax.transaction-api</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7299b25..7ef021a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import javax.transaction.TransactionalException;
+
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 
 class SagaStartAnnotationProcessor implements EventAwareInterceptor {
@@ -31,7 +33,11 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
 
   @Override
   public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
-    return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    try {
+      return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    } catch (OmegaException e) {
+      throw new TransactionalException(e.getMessage(), e.getCause());
+    }
   }
 
   @Override
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index f8e936d..566a456 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.servicecomb.saga.omega.transaction;
 
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import javax.transaction.TransactionalException;
+
 import org.apache.servicecomb.saga.common.EventType;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -43,12 +49,10 @@ public class SagaStartAnnotationProcessorTest {
 
   private final String globalTxId = UUID.randomUUID().toString();
 
-  private final String localTxId = UUID.randomUUID().toString();
-
   @SuppressWarnings("unchecked")
   private final IdGenerator<String> generator = mock(IdGenerator.class);
-
   private final OmegaContext context = new OmegaContext(generator);
+  private final OmegaException exception = new OmegaException("exception", new RuntimeException("runtime exception"));
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context,
       sender);
@@ -86,4 +90,21 @@ public class SagaStartAnnotationProcessorTest {
     assertThat(event.type(), is(EventType.SagaEndedEvent));
     assertThat(event.payloads().length, is(0));
   }
+
+  @Test
+  public void transformInterceptedException() {
+    MessageSender sender = mock(MessageSender.class);
+    SagaStartAnnotationProcessor sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+
+    doThrow(exception).when(sender).send(any());
+
+    try {
+      sagaStartAnnotationProcessor.preIntercept(null, null);
+      expectFailing(TransactionalException.class);
+    } catch (TransactionalException e) {
+      assertThat(e.getMessage(), is("exception"));
+      assertThat(e.getCause(), instanceOf(RuntimeException.class));
+      assertThat(e.getCause().getMessage(), is("runtime exception"));
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 1d41a45..9338b2f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
     <maven.failsafe.version>2.19.1</maven.failsafe.version>
     <grpc.version>1.8.0</grpc.version>
     <kryo.version>4.0.1</kryo.version>
+    <javax.transaction.version>1.2</javax.transaction.version>
   </properties>
 
   <name>ServiceComb Saga</name>
@@ -347,6 +348,11 @@
         <artifactId>kryo</artifactId>
         <version>${kryo.version}</version>
       </dependency>
+      <dependency>
+        <groupId>javax.transaction</groupId>
+        <artifactId>javax.transaction-api</artifactId>
+        <version>${javax.transaction.version}</version>
+      </dependency>
 
       <!-- test dependencies -->
       <dependency>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.