You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/08 12:19:03 UTC

[servicecomb-pack] 07/10: SCB-1321 Refactoring interrupt Omega timeout thread

This is an automated email from the ASF dual-hosted git repository.

zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit ea7c70aaf254f6ba6b2198f2bc48b1e8b393af71
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jul 8 20:14:00 2019 +0800

    SCB-1321 Refactoring interrupt Omega timeout thread
---
 .../omega/transaction/AbstractRecoveryPolicy.java  |   5 +-
 .../omega/transaction/CompensableInterceptor.java  |   2 +-
 .../transaction/RecoveryPolicyTimeoutWrapper.java  | 226 ---------------------
 .../pack/omega/transaction/SagaAbortedEvent.java   |  42 ++++
 .../transaction/SagaStartAnnotationProcessor.java  |  22 +-
 .../pack/omega/transaction/SagaStartAspect.java    |  35 +---
 .../wrapper/RecoveryPolicyTimeoutWrapper.java      | 222 ++++++++++++++++++++
 ...SagaStartAnnotationProcessorTimeoutWrapper.java | 107 ++++++++++
 .../SagaStartAnnotationProcessorWrapper.java}      |  32 +--
 .../omega/transaction/wrapper/TimeoutProb.java     |  92 +++++++++
 .../transaction/wrapper/TimeoutProbManager.java    | 100 +++++++++
 .../omega/transaction/SagaStartAspectTest.java     |  44 +++-
 12 files changed, 639 insertions(+), 290 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index 7de6470..8526581 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction;
 
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.RecoveryPolicyTimeoutWrapper;
 import org.aspectj.lang.ProceedingJoinPoint;
 
 public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
@@ -32,8 +33,8 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
       CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
       throws Throwable {
     if(compensable.timeout()>0){
-      return RecoveryPolicyTimeoutWrapper
-          .getInstance().wrapper(this).applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+      RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
+      return wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
     }else{
       return this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 7186147..08ad7f7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction;
 
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 
-class CompensableInterceptor implements EventAwareInterceptor {
+public class CompensableInterceptor implements EventAwareInterceptor {
   private final OmegaContext context;
   private final SagaMessageSender sender;
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java
deleted file mode 100644
index 3fe60ae..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.omega.transaction;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import org.apache.servicecomb.pack.omega.context.OmegaContext;
-import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * RecoveryPolicy Wrapper
- * 1.Use this wrapper to send a request if the @Compensable timeout>0
- * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable
- *
- * Exception
- * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback
- * 2.If the interrupt fails, throw an OmegaException
- *
- * Note: Omega end thread coding advice
- * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate.
- * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate.
- * */
-
-public class RecoveryPolicyTimeoutWrapper {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper(100);
-  private AbstractRecoveryPolicy recoveryPolicy;
-  private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
-
-  public static RecoveryPolicyTimeoutWrapper getInstance() {
-    return instance;
-  }
-
-  public RecoveryPolicyTimeoutWrapper(int delay) {
-    this.interrupter.scheduleWithFixedDelay(
-        new Runnable() {
-          @Override
-          public void run() {
-            try {
-              RecoveryPolicyTimeoutWrapper.this.interrupt();
-            } catch (Exception e) {
-              LOG.error("The overtime thread interrupt fail",e);
-            }
-          }
-        },
-        0, delay, TimeUnit.MICROSECONDS
-    );
-  }
-
-  /**
-   * Configuration timeout probe thread
-   */
-  private final transient ScheduledExecutorService interrupter =
-      Executors.newSingleThreadScheduledExecutor(
-          new TimeoutProbeThreadFactory()
-      );
-
-  /**
-   * Loop detection of all thread timeout probes, remove probe if the thread has terminated
-   */
-  private void interrupt() {
-    synchronized (this.interrupter) {
-      for (TimeoutProb timeoutProb : this.timeoutProbs) {
-        if (timeoutProb.interruptFailureException == null) {
-          if (timeoutProb.expired()) {
-            if (timeoutProb.interrupted()) {
-              this.timeoutProbs.remove(timeoutProb);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) {
-    this.recoveryPolicy = recoveryPolicy;
-    return this;
-  }
-
-  public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable,
-      CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
-      throws Throwable {
-    final TimeoutProb timeoutProb = new TimeoutProb(joinPoint, compensable);
-    this.timeoutProbs.add(timeoutProb);
-    Object output;
-    try {
-      output = this.recoveryPolicy
-          .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
-      if (timeoutProb.getInterruptFailureException() != null) {
-        throw new OmegaException(timeoutProb.interruptFailureException);
-      }
-    } catch (InterruptedException e) {
-      if (timeoutProb.getInterruptFailureException() != null) {
-        throw new OmegaException(timeoutProb.interruptFailureException);
-      }else{
-        throw new TransactionTimeoutException(e.getMessage(),e);
-      }
-    } catch (IllegalMonitorStateException e) {
-      if (timeoutProb.getInterruptFailureException() != null) {
-        throw new OmegaException(timeoutProb.interruptFailureException);
-      }else{
-        throw new TransactionTimeoutException(e.getMessage(),e);
-      }
-    } catch (ClosedByInterruptException e) {
-      if (timeoutProb.getInterruptFailureException() != null) {
-        throw new OmegaException(timeoutProb.interruptFailureException);
-      }else{
-        throw new TransactionTimeoutException(e.getMessage(),e);
-      }
-    } catch (Throwable e) {
-      throw e;
-    } finally {
-      this.timeoutProbs.remove(timeoutProb);
-    }
-    return output;
-  }
-
-  /**
-   * Define timeout probe
-   */
-  private static final class TimeoutProb implements
-      Comparable<TimeoutProb> {
-
-    private final transient Thread thread = Thread.currentThread();
-    private final transient long startTime = System.currentTimeMillis();
-    private final transient long expireTime;
-    private Exception interruptFailureException = null;
-    private final transient ProceedingJoinPoint joinPoint;
-
-    public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) {
-      this.joinPoint = pnt;
-      this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout());
-    }
-
-    @Override
-    public int compareTo(final TimeoutProb obj) {
-      int compare;
-      if (this.expireTime > obj.expireTime) {
-        compare = 1;
-      } else if (this.expireTime < obj.expireTime) {
-        compare = -1;
-      } else {
-        compare = 0;
-      }
-      return compare;
-    }
-
-    public Exception getInterruptFailureException() {
-      return interruptFailureException;
-    }
-
-    /**
-     *
-     * @return Returns TRUE if expired
-     */
-    public boolean expired() {
-      return this.expireTime < System.currentTimeMillis();
-    }
-
-    /**
-     * Interrupt thread
-     *
-     * @return Returns TRUE if the thread has been interrupted
-     */
-    public boolean interrupted() {
-      boolean interrupted;
-      if (this.thread.isAlive()) {
-        // 如果当前线程是活动状态,则发送线程中断信号
-        try {
-          this.thread.interrupt();
-        } catch (Exception e) {
-          this.interruptFailureException = e;
-          LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
-          throw e;
-        }
-        final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod();
-        LOG.warn("{}: interrupted on {}ms timeout (over {}ms)",
-            new Object[]{method, System.currentTimeMillis() - this.startTime,
-                this.expireTime - this.startTime}
-        );
-        interrupted = false;
-      } else {
-        interrupted = true;
-      }
-      return interrupted;
-    }
-  }
-
-  public class TimeoutProbeThreadFactory implements ThreadFactory {
-
-    public Thread newThread(Runnable runnable) {
-      Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
-          "probe");
-      thread.setPriority(Thread.MAX_PRIORITY);
-      thread.setDaemon(true);
-      return thread;
-    }
-  }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
new file mode 100644
index 0000000..fcc77e6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import org.apache.servicecomb.pack.common.EventType;
+
+public class SagaAbortedEvent extends TxEvent {
+
+  private static final int PAYLOADS_MAX_LENGTH = 10240;
+
+  public SagaAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+    super(EventType.SagaAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
+        stackTrace(throwable));
+  }
+
+  private static String stackTrace(Throwable e) {
+    StringWriter writer = new StringWriter();
+    e.printStackTrace(new PrintWriter(writer));
+    String stackTrace = writer.toString();
+    if (stackTrace.length() > PAYLOADS_MAX_LENGTH) {
+      stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH);
+    }
+    return stackTrace;
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
index 87b7808..767171b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
@@ -20,17 +20,17 @@ package org.apache.servicecomb.pack.omega.transaction;
 import javax.transaction.TransactionalException;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 
-class SagaStartAnnotationProcessor {
+public class SagaStartAnnotationProcessor {
 
   private final OmegaContext omegaContext;
   private final SagaMessageSender sender;
 
-  SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) {
+  public SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) {
     this.omegaContext = omegaContext;
     this.sender = sender;
   }
 
-  AlphaResponse preIntercept(int timeout) {
+  public AlphaResponse preIntercept(int timeout) {
     try {
       return sender
           .send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
@@ -39,7 +39,7 @@ class SagaStartAnnotationProcessor {
     }
   }
 
-  void postIntercept(String parentTxId) {
+  public void postIntercept(String parentTxId) {
     AlphaResponse response = sender
         .send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     if (response.aborted()) {
@@ -47,10 +47,16 @@ class SagaStartAnnotationProcessor {
     }
   }
 
-  void onError(String compensationMethod, Throwable throwable) {
+  public void onError(String compensationMethod, Throwable throwable) {
     String globalTxId = omegaContext.globalTxId();
-    sender.send(
-        new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
-            throwable));
+    if(omegaContext.isAlphaFeatureAkkaEnabled()){
+      sender.send(
+          new SagaAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
+              throwable));
+    }else{
+      sender.send(
+          new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
+              throwable));
+    }
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
index ec4441e..51eba27 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
@@ -17,20 +17,16 @@
 
 package org.apache.servicecomb.pack.omega.transaction;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorTimeoutWrapper;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorWrapper;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Aspect
 public class SagaStartAspect {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
 
@@ -44,27 +40,12 @@ public class SagaStartAspect {
   @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
   Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
     initializeOmegaContext();
-    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
-    sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
-    LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
-
-    try {
-      Object result = joinPoint.proceed();
-
-      sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
-      LOG.debug("Transaction with context {} has finished.", context);
-
-      return result;
-    } catch (Throwable throwable) {
-      // We don't need to handle the OmegaException here
-      if (!(throwable instanceof OmegaException)) {
-        sagaStartAnnotationProcessor.onError(method.toString(), throwable);
-        LOG.error("Transaction {} failed.", context.globalTxId());
-      }
-      throw throwable;
-    } finally {
-      context.clear();
+    if(context.isAlphaFeatureAkkaEnabled() && sagaStart.timeout()>0){
+      SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
+      return wrapper.apply(joinPoint,sagaStart,context);
+    }else{
+      SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
+      return wrapper.apply(joinPoint,sagaStart,context);
     }
   }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java
new file mode 100644
index 0000000..0a59acb
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.channels.ClosedByInterruptException;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.transaction.AbstractRecoveryPolicy;
+import org.apache.servicecomb.pack.omega.transaction.CompensableInterceptor;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException;
+import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RecoveryPolicy Wrapper
+ * 1.Use this wrapper to send a request if the @Compensable timeout>0
+ * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable
+ *
+ * Exception
+ * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback
+ * 2.If the interrupt fails, throw an OmegaException
+ *
+ * Note: Omega end thread coding advice
+ * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate.
+ * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate.
+ * */
+
+public class RecoveryPolicyTimeoutWrapper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  //private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper();
+  private AbstractRecoveryPolicy recoveryPolicy;
+  //private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
+
+//  public static RecoveryPolicyTimeoutWrapper getInstance() {
+//    return instance;
+//  }
+
+  public RecoveryPolicyTimeoutWrapper(AbstractRecoveryPolicy recoveryPolicy) {
+    this.recoveryPolicy = recoveryPolicy;
+//    this.interrupter.scheduleWithFixedDelay(
+//        new Runnable() {
+//          @Override
+//          public void run() {
+//            try {
+//              RecoveryPolicyTimeoutWrapper.this.interrupt();
+//            } catch (Exception e) {
+//              LOG.error("The overtime thread interrupt fail",e);
+//            }
+//          }
+//        },
+//        0, delay, TimeUnit.MICROSECONDS
+//    );
+  }
+
+  /**
+   * Configuration timeout probe thread
+   */
+//  private final transient ScheduledExecutorService interrupter =
+//      Executors.newSingleThreadScheduledExecutor(
+//          new TimeoutProbeThreadFactory()
+//      );
+
+  /**
+   * Loop detection of all thread timeout probes, remove probe if the thread has terminated
+   */
+//  private void interrupt() {
+//    synchronized (this.interrupter) {
+//      for (TimeoutProb timeoutProb : this.timeoutProbs) {
+//        if (timeoutProb.interruptFailureException == null) {
+//          if (timeoutProb.expired()) {
+//            if (timeoutProb.interrupted()) {
+//              this.timeoutProbs.remove(timeoutProb);
+//            }
+//          }
+//        }
+//      }
+//    }
+//  }
+
+//  public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) {
+//    this.recoveryPolicy = recoveryPolicy;
+//    return this;
+//  }
+
+  public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable,
+      CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
+      throws Throwable {
+    final TimeoutProb timeoutProb = TimeoutProbManager.getInstance().addTimeoutProb(compensable.timeout());
+    Object output;
+    try {
+      output = this.recoveryPolicy
+          .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+      if (timeoutProb.getInterruptFailureException() != null) {
+        throw new OmegaException(timeoutProb.getInterruptFailureException());
+      }
+    } catch (InterruptedException e) {
+      if (timeoutProb.getInterruptFailureException() != null) {
+        throw new OmegaException(timeoutProb.getInterruptFailureException());
+      }else{
+        throw new TransactionTimeoutException(e.getMessage(),e);
+      }
+    } catch (IllegalMonitorStateException e) {
+      if (timeoutProb.getInterruptFailureException() != null) {
+        throw new OmegaException(timeoutProb.getInterruptFailureException());
+      }else{
+        throw new TransactionTimeoutException(e.getMessage(),e);
+      }
+    } catch (ClosedByInterruptException e) {
+      if (timeoutProb.getInterruptFailureException() != null) {
+        throw new OmegaException(timeoutProb.getInterruptFailureException());
+      }else{
+        throw new TransactionTimeoutException(e.getMessage(),e);
+      }
+    } catch (Throwable e) {
+      throw e;
+    } finally {
+      TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb);
+    }
+    return output;
+  }
+
+  /**
+   * Define timeout probe
+   */
+//  private static final class TimeoutProb implements
+//      Comparable<TimeoutProb> {
+//
+//    private final transient Thread thread = Thread.currentThread();
+//    private final transient long startTime = System.currentTimeMillis();
+//    private final transient long expireTime;
+//    private Exception interruptFailureException = null;
+//    private final transient ProceedingJoinPoint joinPoint;
+//
+//    public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) {
+//      this.joinPoint = pnt;
+//      this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout());
+//    }
+//
+//    @Override
+//    public int compareTo(final TimeoutProb obj) {
+//      int compare;
+//      if (this.expireTime > obj.expireTime) {
+//        compare = 1;
+//      } else if (this.expireTime < obj.expireTime) {
+//        compare = -1;
+//      } else {
+//        compare = 0;
+//      }
+//      return compare;
+//    }
+//
+//    public Exception getInterruptFailureException() {
+//      return interruptFailureException;
+//    }
+//
+//    /**
+//     *
+//     * @return Returns TRUE if expired
+//     */
+//    public boolean expired() {
+//      return this.expireTime < System.currentTimeMillis();
+//    }
+//
+//    /**
+//     * Interrupt thread
+//     *
+//     * @return Returns TRUE if the thread has been interrupted
+//     */
+//    public boolean interrupted() {
+//      boolean interrupted;
+//      if (this.thread.isAlive()) {
+//        // 如果当前线程是活动状态,则发送线程中断信号
+//        try {
+//          this.thread.interrupt();
+//        } catch (Exception e) {
+//          this.interruptFailureException = e;
+//          LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
+//          throw e;
+//        }
+//        final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod();
+//        LOG.warn("{}: interrupted on {}ms timeout (over {}ms)",
+//            new Object[]{method, System.currentTimeMillis() - this.startTime,
+//                this.expireTime - this.startTime}
+//        );
+//        interrupted = false;
+//      } else {
+//        interrupted = true;
+//      }
+//      return interrupted;
+//    }
+//  }
+//
+//  public class TimeoutProbeThreadFactory implements ThreadFactory {
+//
+//    public Thread newThread(Runnable runnable) {
+//      Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
+//          "probe");
+//      thread.setPriority(Thread.MAX_PRIORITY);
+//      thread.setDaemon(true);
+//      return thread;
+//    }
+//  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java
new file mode 100644
index 0000000..5d7d7d6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import java.nio.channels.ClosedByInterruptException;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor;
+import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SagaStartAnnotationProcessorTimeoutWrapper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+
+  public SagaStartAnnotationProcessorTimeoutWrapper(
+      SagaStartAnnotationProcessor sagaStartAnnotationProcessor) {
+    this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor;
+  }
+
+  public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context)
+      throws Throwable {
+    final TimeoutProb timeoutProb = TimeoutProbManager.getInstance()
+        .addTimeoutProb(sagaStart.timeout());
+    Object output;
+    try {
+      Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+      sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initialized context {} before execution of method {}", context,
+            method.toString());
+      }
+      try {
+        output = joinPoint.proceed();
+        if (timeoutProb.getInterruptFailureException() != null) {
+          throw new OmegaException(timeoutProb.getInterruptFailureException());
+        }
+        sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Transaction with context {} has finished.", context);
+        }
+        return output;
+      } catch (Throwable throwable) {
+        // TODO We still need to intercept some exceptions that we can't judge the state of the child transaction.
+        //  At this point, we don't need to send SagaAbortEvent, just need to throw a TransactionTimeoutException
+        //  For example, java.net.SocketTimeoutException, etc.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TimeoutWrapper exception {}", throwable.getClass().getName());
+        }
+        if (timeoutProb.getInterruptFailureException() != null) {
+          LOG.info("TimeoutProb interrupt fail");
+          throw timeoutProb.getInterruptFailureException();
+        } else if (isThreadInterruptException(throwable)) {
+          // We don't have to send an SagaAbortEvent
+          // Because the SagaActor state automatically change to suspended when timeout.
+          throw new TransactionTimeoutException("Timeout interrupt", throwable);
+        } else {
+          // We don't need to handle the OmegaException here
+          if (!(throwable instanceof OmegaException)) {
+            LOG.info("TimeoutWrapper Exception {}", throwable.getClass().getName());
+            sagaStartAnnotationProcessor.onError(method.toString(), throwable);
+            LOG.error("Transaction {} failed.", context.globalTxId());
+          }
+        }
+        throw throwable;
+      }
+    } finally {
+      context.clear();
+      TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb);
+    }
+  }
+
+  private boolean isThreadInterruptException(Throwable throwable) {
+    if (throwable instanceof InterruptedException ||
+        throwable instanceof IllegalMonitorStateException ||
+        throwable instanceof ClosedByInterruptException ||
+        throwable.getCause() instanceof InterruptedException ||
+        throwable.getCause() instanceof IllegalMonitorStateException ||
+        throwable.getCause() instanceof ClosedByInterruptException) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
similarity index 74%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
index ec4441e..2837c7f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
@@ -15,46 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.pack.omega.transaction;
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 import org.apache.servicecomb.pack.omega.context.OmegaContext;
 import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor;
 import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Aspect
-public class SagaStartAspect {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public class SagaStartAnnotationProcessorWrapper {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
 
-  private final OmegaContext context;
-
-  public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
-    this.context = context;
-    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+  public SagaStartAnnotationProcessorWrapper(
+      SagaStartAnnotationProcessor sagaStartAnnotationProcessor) {
+    this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor;
   }
 
-  @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
-  Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
-    initializeOmegaContext();
+  public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context)
+      throws Throwable {
     Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
     sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
-
     try {
       Object result = joinPoint.proceed();
-
       sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
       LOG.debug("Transaction with context {} has finished.", context);
-
       return result;
     } catch (Throwable throwable) {
       // We don't need to handle the OmegaException here
@@ -67,8 +59,4 @@ public class SagaStartAspect {
       context.clear();
     }
   }
-
-  private void initializeOmegaContext() {
-    context.setLocalTxId(context.newGlobalTxId());
-  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java
new file mode 100644
index 0000000..dc70aef
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Define timeout probe
+ */
+public class TimeoutProb implements Comparable<TimeoutProb> {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final transient Thread thread = Thread.currentThread();
+  private final transient long startTime = System.currentTimeMillis();
+  private final transient long expireTime;
+  private Exception interruptFailureException = null;
+  private boolean interruptSent = false;
+  public TimeoutProb(int timeout) {
+    this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(timeout);
+  }
+
+  @Override
+  public int compareTo(final TimeoutProb obj) {
+    int compare;
+    if (this.expireTime > obj.expireTime) {
+      compare = 1;
+    } else if (this.expireTime < obj.expireTime) {
+      compare = -1;
+    } else {
+      compare = 0;
+    }
+    return compare;
+  }
+
+  public Exception getInterruptFailureException() {
+    return interruptFailureException;
+  }
+
+  /**
+   * @return Returns TRUE if expired
+   */
+  public boolean expired() {
+    return this.expireTime < System.currentTimeMillis();
+  }
+
+  /**
+   * Interrupt thread
+   *
+   * @return Returns TRUE if the thread has been interrupted
+   */
+  public boolean interrupted() {
+    boolean interrupted;
+    if (this.thread.isAlive()) {
+      // 如果当前线程是活动状态,则发送线程中断信号
+      try {
+        this.thread.interrupt();
+        if(!interruptSent){
+          LOG.warn("Thread interrupted on {}ms timeout (over {}ms)",
+              new Object[]{System.currentTimeMillis() - this.startTime,
+                  this.expireTime - this.startTime}
+          );
+        }
+        interruptSent = true;
+      } catch (Exception e) {
+        this.interruptFailureException = e;
+        LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
+        throw e;
+      }
+      interrupted = false;
+    } else {
+      interrupted = true;
+    }
+    return interrupted;
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java
new file mode 100644
index 0000000..3a3a2ac
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimeoutProbManager {
+
+  private static TimeoutProbManager instance = new TimeoutProbManager(100);
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
+  private final transient ScheduledExecutorService interrupter =
+      Executors.newSingleThreadScheduledExecutor(
+          new TimeoutProbeThreadFactory()
+      );
+
+  public static TimeoutProbManager getInstance() {
+    return instance;
+  }
+
+  public TimeoutProbManager(int delay) {
+    this.interrupter.scheduleWithFixedDelay(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              TimeoutProbManager.this.interrupt();
+            } catch (Exception e) {
+              LOG.error("The overtime thread interrupt fail", e);
+            }
+          }
+        },
+        0, delay, TimeUnit.MICROSECONDS
+    );
+  }
+
+  public TimeoutProb addTimeoutProb(int timeout) {
+    final TimeoutProb timeoutProb = new TimeoutProb(timeout);
+    this.timeoutProbs.add(timeoutProb);
+    return timeoutProb;
+  }
+
+  public void removeTimeoutProb(TimeoutProb timeoutProb) {
+    this.timeoutProbs.remove(timeoutProb);
+  }
+
+  /**
+   * Loop detection of all thread timeout probes, remove probe if the thread has terminated
+   */
+  private void interrupt() {
+    synchronized (this.interrupter) {
+      for (TimeoutProb timeoutProb : this.timeoutProbs) {
+        if (timeoutProb.getInterruptFailureException() == null) {
+          if (timeoutProb.expired()) {
+            if (timeoutProb.interrupted()) {
+              this.timeoutProbs.remove(timeoutProb);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Configuration timeout probe thread
+   */
+  public class TimeoutProbeThreadFactory implements ThreadFactory {
+
+    public Thread newThread(Runnable runnable) {
+      Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
+          "probe");
+      thread.setPriority(Thread.MAX_PRIORITY);
+      thread.setDaemon(true);
+      return thread;
+    }
+  }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
index c11a790..656eac1 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
@@ -72,20 +72,20 @@ public class SagaStartAspectTest {
   private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
   private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
 
-  private final OmegaContext omegaContext = new OmegaContext(idGenerator);
-  private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
+  private OmegaContext omegaContext;
+  private SagaStartAspect aspect;
 
   @Before
   public void setUp() throws Exception {
     when(idGenerator.nextId()).thenReturn(globalTxId);
     when(joinPoint.getSignature()).thenReturn(methodSignature);
-
     when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
-    omegaContext.clear();
   }
 
   @Test
   public void newGlobalTxIdInSagaStart() throws Throwable {
+    omegaContext = new OmegaContext(idGenerator);
+    aspect = new SagaStartAspect(sender, omegaContext);
     aspect.advise(joinPoint, sagaStart);
 
     TxEvent startedEvent = messages.get(0);
@@ -108,6 +108,8 @@ public class SagaStartAspectTest {
 
   @Test
   public void clearContextOnSagaStartError() throws Throwable {
+    omegaContext = new OmegaContext(idGenerator);
+    aspect = new SagaStartAspect(sender, omegaContext);
     RuntimeException oops = new RuntimeException("oops");
 
     when(joinPoint.proceed()).thenThrow(oops);
@@ -138,6 +140,40 @@ public class SagaStartAspectTest {
     assertThat(omegaContext.localTxId(), is(nullValue()));
   }
 
+  @Test
+  public void clearContextOnSagaStartErrorWithAkka() throws Throwable {
+    omegaContext = new OmegaContext(idGenerator,true);
+    aspect = new SagaStartAspect(sender, omegaContext);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint, sagaStart);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    assertThat(messages.size(), is(2));
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is(EventType.SagaStartedEvent));
+
+    event = messages.get(1);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is(EventType.SagaAbortedEvent));
+
+    assertThat(omegaContext.globalTxId(), is(nullValue()));
+    assertThat(omegaContext.localTxId(), is(nullValue()));
+  }
+
   private String doNothing() {
     return "doNothing";
   }