You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by zh...@apache.org on 2019/07/08 12:19:03 UTC
[servicecomb-pack] 07/10: SCB-1321 Refactoring interrupt Omega
timeout thread
This is an automated email from the ASF dual-hosted git repository.
zhanglei pushed a commit to branch SCB-1321
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit ea7c70aaf254f6ba6b2198f2bc48b1e8b393af71
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Mon Jul 8 20:14:00 2019 +0800
SCB-1321 Refactoring interrupt Omega timeout thread
---
.../omega/transaction/AbstractRecoveryPolicy.java | 5 +-
.../omega/transaction/CompensableInterceptor.java | 2 +-
.../transaction/RecoveryPolicyTimeoutWrapper.java | 226 ---------------------
.../pack/omega/transaction/SagaAbortedEvent.java | 42 ++++
.../transaction/SagaStartAnnotationProcessor.java | 22 +-
.../pack/omega/transaction/SagaStartAspect.java | 35 +---
.../wrapper/RecoveryPolicyTimeoutWrapper.java | 222 ++++++++++++++++++++
...SagaStartAnnotationProcessorTimeoutWrapper.java | 107 ++++++++++
.../SagaStartAnnotationProcessorWrapper.java} | 32 +--
.../omega/transaction/wrapper/TimeoutProb.java | 92 +++++++++
.../transaction/wrapper/TimeoutProbManager.java | 100 +++++++++
.../omega/transaction/SagaStartAspectTest.java | 44 +++-
12 files changed, 639 insertions(+), 290 deletions(-)
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index 7de6470..8526581 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.RecoveryPolicyTimeoutWrapper;
import org.aspectj.lang.ProceedingJoinPoint;
public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
@@ -32,8 +33,8 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
throws Throwable {
if(compensable.timeout()>0){
- return RecoveryPolicyTimeoutWrapper
- .getInstance().wrapper(this).applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+ RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
+ return wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
}else{
return this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 7186147..08ad7f7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -19,7 +19,7 @@ package org.apache.servicecomb.pack.omega.transaction;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
-class CompensableInterceptor implements EventAwareInterceptor {
+public class CompensableInterceptor implements EventAwareInterceptor {
private final OmegaContext context;
private final SagaMessageSender sender;
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java
deleted file mode 100644
index 3fe60ae..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyTimeoutWrapper.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.omega.transaction;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import org.apache.servicecomb.pack.omega.context.OmegaContext;
-import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * RecoveryPolicy Wrapper
- * 1.Use this wrapper to send a request if the @Compensable timeout>0
- * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable
- *
- * Exception
- * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback
- * 2.If the interrupt fails, throw an OmegaException
- *
- * Note: Omega end thread coding advice
- * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate.
- * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate.
- * */
-
-public class RecoveryPolicyTimeoutWrapper {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper(100);
- private AbstractRecoveryPolicy recoveryPolicy;
- private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
-
- public static RecoveryPolicyTimeoutWrapper getInstance() {
- return instance;
- }
-
- public RecoveryPolicyTimeoutWrapper(int delay) {
- this.interrupter.scheduleWithFixedDelay(
- new Runnable() {
- @Override
- public void run() {
- try {
- RecoveryPolicyTimeoutWrapper.this.interrupt();
- } catch (Exception e) {
- LOG.error("The overtime thread interrupt fail",e);
- }
- }
- },
- 0, delay, TimeUnit.MICROSECONDS
- );
- }
-
- /**
- * Configuration timeout probe thread
- */
- private final transient ScheduledExecutorService interrupter =
- Executors.newSingleThreadScheduledExecutor(
- new TimeoutProbeThreadFactory()
- );
-
- /**
- * Loop detection of all thread timeout probes, remove probe if the thread has terminated
- */
- private void interrupt() {
- synchronized (this.interrupter) {
- for (TimeoutProb timeoutProb : this.timeoutProbs) {
- if (timeoutProb.interruptFailureException == null) {
- if (timeoutProb.expired()) {
- if (timeoutProb.interrupted()) {
- this.timeoutProbs.remove(timeoutProb);
- }
- }
- }
- }
- }
- }
-
- public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) {
- this.recoveryPolicy = recoveryPolicy;
- return this;
- }
-
- public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable,
- CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
- throws Throwable {
- final TimeoutProb timeoutProb = new TimeoutProb(joinPoint, compensable);
- this.timeoutProbs.add(timeoutProb);
- Object output;
- try {
- output = this.recoveryPolicy
- .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
- if (timeoutProb.getInterruptFailureException() != null) {
- throw new OmegaException(timeoutProb.interruptFailureException);
- }
- } catch (InterruptedException e) {
- if (timeoutProb.getInterruptFailureException() != null) {
- throw new OmegaException(timeoutProb.interruptFailureException);
- }else{
- throw new TransactionTimeoutException(e.getMessage(),e);
- }
- } catch (IllegalMonitorStateException e) {
- if (timeoutProb.getInterruptFailureException() != null) {
- throw new OmegaException(timeoutProb.interruptFailureException);
- }else{
- throw new TransactionTimeoutException(e.getMessage(),e);
- }
- } catch (ClosedByInterruptException e) {
- if (timeoutProb.getInterruptFailureException() != null) {
- throw new OmegaException(timeoutProb.interruptFailureException);
- }else{
- throw new TransactionTimeoutException(e.getMessage(),e);
- }
- } catch (Throwable e) {
- throw e;
- } finally {
- this.timeoutProbs.remove(timeoutProb);
- }
- return output;
- }
-
- /**
- * Define timeout probe
- */
- private static final class TimeoutProb implements
- Comparable<TimeoutProb> {
-
- private final transient Thread thread = Thread.currentThread();
- private final transient long startTime = System.currentTimeMillis();
- private final transient long expireTime;
- private Exception interruptFailureException = null;
- private final transient ProceedingJoinPoint joinPoint;
-
- public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) {
- this.joinPoint = pnt;
- this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout());
- }
-
- @Override
- public int compareTo(final TimeoutProb obj) {
- int compare;
- if (this.expireTime > obj.expireTime) {
- compare = 1;
- } else if (this.expireTime < obj.expireTime) {
- compare = -1;
- } else {
- compare = 0;
- }
- return compare;
- }
-
- public Exception getInterruptFailureException() {
- return interruptFailureException;
- }
-
- /**
- *
- * @return Returns TRUE if expired
- */
- public boolean expired() {
- return this.expireTime < System.currentTimeMillis();
- }
-
- /**
- * Interrupt thread
- *
- * @return Returns TRUE if the thread has been interrupted
- */
- public boolean interrupted() {
- boolean interrupted;
- if (this.thread.isAlive()) {
- // 如果当前线程是活动状态,则发送线程中断信号
- try {
- this.thread.interrupt();
- } catch (Exception e) {
- this.interruptFailureException = e;
- LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
- throw e;
- }
- final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod();
- LOG.warn("{}: interrupted on {}ms timeout (over {}ms)",
- new Object[]{method, System.currentTimeMillis() - this.startTime,
- this.expireTime - this.startTime}
- );
- interrupted = false;
- } else {
- interrupted = true;
- }
- return interrupted;
- }
- }
-
- public class TimeoutProbeThreadFactory implements ThreadFactory {
-
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
- "probe");
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setDaemon(true);
- return thread;
- }
- }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
new file mode 100644
index 0000000..fcc77e6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import org.apache.servicecomb.pack.common.EventType;
+
+public class SagaAbortedEvent extends TxEvent {
+
+ private static final int PAYLOADS_MAX_LENGTH = 10240;
+
+ public SagaAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+ super(EventType.SagaAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
+ stackTrace(throwable));
+ }
+
+ private static String stackTrace(Throwable e) {
+ StringWriter writer = new StringWriter();
+ e.printStackTrace(new PrintWriter(writer));
+ String stackTrace = writer.toString();
+ if (stackTrace.length() > PAYLOADS_MAX_LENGTH) {
+ stackTrace = stackTrace.substring(0, PAYLOADS_MAX_LENGTH);
+ }
+ return stackTrace;
+ }
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
index 87b7808..767171b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
@@ -20,17 +20,17 @@ package org.apache.servicecomb.pack.omega.transaction;
import javax.transaction.TransactionalException;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
-class SagaStartAnnotationProcessor {
+public class SagaStartAnnotationProcessor {
private final OmegaContext omegaContext;
private final SagaMessageSender sender;
- SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) {
+ public SagaStartAnnotationProcessor(OmegaContext omegaContext, SagaMessageSender sender) {
this.omegaContext = omegaContext;
this.sender = sender;
}
- AlphaResponse preIntercept(int timeout) {
+ public AlphaResponse preIntercept(int timeout) {
try {
return sender
.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
@@ -39,7 +39,7 @@ class SagaStartAnnotationProcessor {
}
}
- void postIntercept(String parentTxId) {
+ public void postIntercept(String parentTxId) {
AlphaResponse response = sender
.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
if (response.aborted()) {
@@ -47,10 +47,16 @@ class SagaStartAnnotationProcessor {
}
}
- void onError(String compensationMethod, Throwable throwable) {
+ public void onError(String compensationMethod, Throwable throwable) {
String globalTxId = omegaContext.globalTxId();
- sender.send(
- new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
- throwable));
+ if(omegaContext.isAlphaFeatureAkkaEnabled()){
+ sender.send(
+ new SagaAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
+ throwable));
+ }else{
+ sender.send(
+ new TxAbortedEvent(globalTxId, omegaContext.localTxId(), null, compensationMethod,
+ throwable));
+ }
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
index ec4441e..51eba27 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
@@ -17,20 +17,16 @@
package org.apache.servicecomb.pack.omega.transaction;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorTimeoutWrapper;
+import org.apache.servicecomb.pack.omega.transaction.wrapper.SagaStartAnnotationProcessorWrapper;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.reflect.MethodSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Aspect
public class SagaStartAspect {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
@@ -44,27 +40,12 @@ public class SagaStartAspect {
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
initializeOmegaContext();
- Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
- sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
- LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
-
- try {
- Object result = joinPoint.proceed();
-
- sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
- LOG.debug("Transaction with context {} has finished.", context);
-
- return result;
- } catch (Throwable throwable) {
- // We don't need to handle the OmegaException here
- if (!(throwable instanceof OmegaException)) {
- sagaStartAnnotationProcessor.onError(method.toString(), throwable);
- LOG.error("Transaction {} failed.", context.globalTxId());
- }
- throw throwable;
- } finally {
- context.clear();
+ if(context.isAlphaFeatureAkkaEnabled() && sagaStart.timeout()>0){
+ SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
+ return wrapper.apply(joinPoint,sagaStart,context);
+ }else{
+ SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
+ return wrapper.apply(joinPoint,sagaStart,context);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java
new file mode 100644
index 0000000..0a59acb
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/RecoveryPolicyTimeoutWrapper.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.channels.ClosedByInterruptException;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.transaction.AbstractRecoveryPolicy;
+import org.apache.servicecomb.pack.omega.transaction.CompensableInterceptor;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException;
+import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RecoveryPolicy Wrapper
+ * 1.Use this wrapper to send a request if the @Compensable timeout>0
+ * 2.Terminate thread execution if execution time is greater than the timeout of @Compensable
+ *
+ * Exception
+ * 1.If the interrupt succeeds, a TransactionTimeoutException is thrown and the local transaction is rollback
+ * 2.If the interrupt fails, throw an OmegaException
+ *
+ * Note: Omega end thread coding advice
+ * 1.add short sleep to while true loop. Otherwise, the thread may not be able to terminate.
+ * 2.Replace the synchronized with ReentrantLock, Otherwise, the thread may not be able to terminate.
+ * */
+
+public class RecoveryPolicyTimeoutWrapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ //private static RecoveryPolicyTimeoutWrapper instance = new RecoveryPolicyTimeoutWrapper();
+ private AbstractRecoveryPolicy recoveryPolicy;
+ //private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
+
+// public static RecoveryPolicyTimeoutWrapper getInstance() {
+// return instance;
+// }
+
+ public RecoveryPolicyTimeoutWrapper(AbstractRecoveryPolicy recoveryPolicy) {
+ this.recoveryPolicy = recoveryPolicy;
+// this.interrupter.scheduleWithFixedDelay(
+// new Runnable() {
+// @Override
+// public void run() {
+// try {
+// RecoveryPolicyTimeoutWrapper.this.interrupt();
+// } catch (Exception e) {
+// LOG.error("The overtime thread interrupt fail",e);
+// }
+// }
+// },
+// 0, delay, TimeUnit.MICROSECONDS
+// );
+ }
+
+ /**
+ * Configuration timeout probe thread
+ */
+// private final transient ScheduledExecutorService interrupter =
+// Executors.newSingleThreadScheduledExecutor(
+// new TimeoutProbeThreadFactory()
+// );
+
+ /**
+ * Loop detection of all thread timeout probes, remove probe if the thread has terminated
+ */
+// private void interrupt() {
+// synchronized (this.interrupter) {
+// for (TimeoutProb timeoutProb : this.timeoutProbs) {
+// if (timeoutProb.interruptFailureException == null) {
+// if (timeoutProb.expired()) {
+// if (timeoutProb.interrupted()) {
+// this.timeoutProbs.remove(timeoutProb);
+// }
+// }
+// }
+// }
+// }
+// }
+
+// public RecoveryPolicyTimeoutWrapper wrapper(AbstractRecoveryPolicy recoveryPolicy) {
+// this.recoveryPolicy = recoveryPolicy;
+// return this;
+// }
+
+ public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable,
+ CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
+ throws Throwable {
+ final TimeoutProb timeoutProb = TimeoutProbManager.getInstance().addTimeoutProb(compensable.timeout());
+ Object output;
+ try {
+ output = this.recoveryPolicy
+ .applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+ if (timeoutProb.getInterruptFailureException() != null) {
+ throw new OmegaException(timeoutProb.getInterruptFailureException());
+ }
+ } catch (InterruptedException e) {
+ if (timeoutProb.getInterruptFailureException() != null) {
+ throw new OmegaException(timeoutProb.getInterruptFailureException());
+ }else{
+ throw new TransactionTimeoutException(e.getMessage(),e);
+ }
+ } catch (IllegalMonitorStateException e) {
+ if (timeoutProb.getInterruptFailureException() != null) {
+ throw new OmegaException(timeoutProb.getInterruptFailureException());
+ }else{
+ throw new TransactionTimeoutException(e.getMessage(),e);
+ }
+ } catch (ClosedByInterruptException e) {
+ if (timeoutProb.getInterruptFailureException() != null) {
+ throw new OmegaException(timeoutProb.getInterruptFailureException());
+ }else{
+ throw new TransactionTimeoutException(e.getMessage(),e);
+ }
+ } catch (Throwable e) {
+ throw e;
+ } finally {
+ TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb);
+ }
+ return output;
+ }
+
+ /**
+ * Define timeout probe
+ */
+// private static final class TimeoutProb implements
+// Comparable<TimeoutProb> {
+//
+// private final transient Thread thread = Thread.currentThread();
+// private final transient long startTime = System.currentTimeMillis();
+// private final transient long expireTime;
+// private Exception interruptFailureException = null;
+// private final transient ProceedingJoinPoint joinPoint;
+//
+// public TimeoutProb(final ProceedingJoinPoint pnt, Compensable compensable) {
+// this.joinPoint = pnt;
+// this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(compensable.timeout());
+// }
+//
+// @Override
+// public int compareTo(final TimeoutProb obj) {
+// int compare;
+// if (this.expireTime > obj.expireTime) {
+// compare = 1;
+// } else if (this.expireTime < obj.expireTime) {
+// compare = -1;
+// } else {
+// compare = 0;
+// }
+// return compare;
+// }
+//
+// public Exception getInterruptFailureException() {
+// return interruptFailureException;
+// }
+//
+// /**
+// *
+// * @return Returns TRUE if expired
+// */
+// public boolean expired() {
+// return this.expireTime < System.currentTimeMillis();
+// }
+//
+// /**
+// * Interrupt thread
+// *
+// * @return Returns TRUE if the thread has been interrupted
+// */
+// public boolean interrupted() {
+// boolean interrupted;
+// if (this.thread.isAlive()) {
+// // 如果当前线程是活动状态,则发送线程中断信号
+// try {
+// this.thread.interrupt();
+// } catch (Exception e) {
+// this.interruptFailureException = e;
+// LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
+// throw e;
+// }
+// final Method method = MethodSignature.class.cast(this.joinPoint.getSignature()).getMethod();
+// LOG.warn("{}: interrupted on {}ms timeout (over {}ms)",
+// new Object[]{method, System.currentTimeMillis() - this.startTime,
+// this.expireTime - this.startTime}
+// );
+// interrupted = false;
+// } else {
+// interrupted = true;
+// }
+// return interrupted;
+// }
+// }
+//
+// public class TimeoutProbeThreadFactory implements ThreadFactory {
+//
+// public Thread newThread(Runnable runnable) {
+// Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
+// "probe");
+// thread.setPriority(Thread.MAX_PRIORITY);
+// thread.setDaemon(true);
+// return thread;
+// }
+// }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java
new file mode 100644
index 0000000..5d7d7d6
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorTimeoutWrapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import java.nio.channels.ClosedByInterruptException;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor;
+import org.apache.servicecomb.pack.omega.transaction.TransactionTimeoutException;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SagaStartAnnotationProcessorTimeoutWrapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
+
+ public SagaStartAnnotationProcessorTimeoutWrapper(
+ SagaStartAnnotationProcessor sagaStartAnnotationProcessor) {
+ this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor;
+ }
+
+ public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context)
+ throws Throwable {
+ final TimeoutProb timeoutProb = TimeoutProbManager.getInstance()
+ .addTimeoutProb(sagaStart.timeout());
+ Object output;
+ try {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized context {} before execution of method {}", context,
+ method.toString());
+ }
+ try {
+ output = joinPoint.proceed();
+ if (timeoutProb.getInterruptFailureException() != null) {
+ throw new OmegaException(timeoutProb.getInterruptFailureException());
+ }
+ sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction with context {} has finished.", context);
+ }
+ return output;
+ } catch (Throwable throwable) {
+ // TODO We still need to intercept some exceptions that we can't judge the state of the child transaction.
+ // At this point, we don't need to send SagaAbortEvent, just need to throw a TransactionTimeoutException
+ // For example, java.net.SocketTimeoutException, etc.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TimeoutWrapper exception {}", throwable.getClass().getName());
+ }
+ if (timeoutProb.getInterruptFailureException() != null) {
+ LOG.info("TimeoutProb interrupt fail");
+ throw timeoutProb.getInterruptFailureException();
+ } else if (isThreadInterruptException(throwable)) {
+ // We don't have to send an SagaAbortEvent
+ // Because the SagaActor state automatically change to suspended when timeout.
+ throw new TransactionTimeoutException("Timeout interrupt", throwable);
+ } else {
+ // We don't need to handle the OmegaException here
+ if (!(throwable instanceof OmegaException)) {
+ LOG.info("TimeoutWrapper Exception {}", throwable.getClass().getName());
+ sagaStartAnnotationProcessor.onError(method.toString(), throwable);
+ LOG.error("Transaction {} failed.", context.globalTxId());
+ }
+ }
+ throw throwable;
+ }
+ } finally {
+ context.clear();
+ TimeoutProbManager.getInstance().removeTimeoutProb(timeoutProb);
+ }
+ }
+
+ private boolean isThreadInterruptException(Throwable throwable) {
+ if (throwable instanceof InterruptedException ||
+ throwable instanceof IllegalMonitorStateException ||
+ throwable instanceof ClosedByInterruptException ||
+ throwable.getCause() instanceof InterruptedException ||
+ throwable.getCause() instanceof IllegalMonitorStateException ||
+ throwable.getCause() instanceof ClosedByInterruptException) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
similarity index 74%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
index ec4441e..2837c7f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
@@ -15,46 +15,38 @@
* limitations under the License.
*/
-package org.apache.servicecomb.pack.omega.transaction;
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import org.apache.servicecomb.pack.omega.context.OmegaContext;
import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.pack.omega.transaction.OmegaException;
+import org.apache.servicecomb.pack.omega.transaction.SagaStartAnnotationProcessor;
import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Aspect
-public class SagaStartAspect {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public class SagaStartAnnotationProcessorWrapper {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
- private final OmegaContext context;
-
- public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
- this.context = context;
- this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
+ public SagaStartAnnotationProcessorWrapper(
+ SagaStartAnnotationProcessor sagaStartAnnotationProcessor) {
+ this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor;
}
- @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
- Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
- initializeOmegaContext();
+ public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context)
+ throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
-
sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
-
try {
Object result = joinPoint.proceed();
-
sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
LOG.debug("Transaction with context {} has finished.", context);
-
return result;
} catch (Throwable throwable) {
// We don't need to handle the OmegaException here
@@ -67,8 +59,4 @@ public class SagaStartAspect {
context.clear();
}
}
-
- private void initializeOmegaContext() {
- context.setLocalTxId(context.newGlobalTxId());
- }
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java
new file mode 100644
index 0000000..dc70aef
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProb.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Define timeout probe
+ */
+public class TimeoutProb implements Comparable<TimeoutProb> {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final transient Thread thread = Thread.currentThread();
+ private final transient long startTime = System.currentTimeMillis();
+ private final transient long expireTime;
+ private Exception interruptFailureException = null;
+ private boolean interruptSent = false;
+ public TimeoutProb(int timeout) {
+ this.expireTime = this.startTime + TimeUnit.SECONDS.toMillis(timeout);
+ }
+
+ @Override
+ public int compareTo(final TimeoutProb obj) {
+ int compare;
+ if (this.expireTime > obj.expireTime) {
+ compare = 1;
+ } else if (this.expireTime < obj.expireTime) {
+ compare = -1;
+ } else {
+ compare = 0;
+ }
+ return compare;
+ }
+
+ public Exception getInterruptFailureException() {
+ return interruptFailureException;
+ }
+
+ /**
+ * @return Returns TRUE if expired
+ */
+ public boolean expired() {
+ return this.expireTime < System.currentTimeMillis();
+ }
+
+ /**
+ * Interrupt thread
+ *
+ * @return Returns TRUE if the thread has been interrupted
+ */
+ public boolean interrupted() {
+ boolean interrupted;
+ if (this.thread.isAlive()) {
+ // 如果当前线程是活动状态,则发送线程中断信号
+ try {
+ this.thread.interrupt();
+ if(!interruptSent){
+ LOG.warn("Thread interrupted on {}ms timeout (over {}ms)",
+ new Object[]{System.currentTimeMillis() - this.startTime,
+ this.expireTime - this.startTime}
+ );
+ }
+ interruptSent = true;
+ } catch (Exception e) {
+ this.interruptFailureException = e;
+ LOG.info("Failed to interrupt the thread " + this.thread.getName(), e);
+ throw e;
+ }
+ interrupted = false;
+ } else {
+ interrupted = true;
+ }
+ return interrupted;
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java
new file mode 100644
index 0000000..3a3a2ac
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/TimeoutProbManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction.wrapper;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimeoutProbManager {
+
+ private static TimeoutProbManager instance = new TimeoutProbManager(100);
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final transient Set<TimeoutProb> timeoutProbs = new ConcurrentSkipListSet<TimeoutProb>();
+ private final transient ScheduledExecutorService interrupter =
+ Executors.newSingleThreadScheduledExecutor(
+ new TimeoutProbeThreadFactory()
+ );
+
+ public static TimeoutProbManager getInstance() {
+ return instance;
+ }
+
+ public TimeoutProbManager(int delay) {
+ this.interrupter.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ TimeoutProbManager.this.interrupt();
+ } catch (Exception e) {
+ LOG.error("The overtime thread interrupt fail", e);
+ }
+ }
+ },
+ 0, delay, TimeUnit.MICROSECONDS
+ );
+ }
+
+ public TimeoutProb addTimeoutProb(int timeout) {
+ final TimeoutProb timeoutProb = new TimeoutProb(timeout);
+ this.timeoutProbs.add(timeoutProb);
+ return timeoutProb;
+ }
+
+ public void removeTimeoutProb(TimeoutProb timeoutProb) {
+ this.timeoutProbs.remove(timeoutProb);
+ }
+
+ /**
+ * Loop detection of all thread timeout probes, remove probe if the thread has terminated
+ */
+ private void interrupt() {
+ synchronized (this.interrupter) {
+ for (TimeoutProb timeoutProb : this.timeoutProbs) {
+ if (timeoutProb.getInterruptFailureException() == null) {
+ if (timeoutProb.expired()) {
+ if (timeoutProb.interrupted()) {
+ this.timeoutProbs.remove(timeoutProb);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Configuration timeout probe thread
+ */
+ public class TimeoutProbeThreadFactory implements ThreadFactory {
+
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(new ThreadGroup("recovery-policy-timeout-wrapper"), runnable,
+ "probe");
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+ return thread;
+ }
+ }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
index c11a790..656eac1 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAspectTest.java
@@ -72,20 +72,20 @@ public class SagaStartAspectTest {
private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
private final SagaStart sagaStart = Mockito.mock(SagaStart.class);
- private final OmegaContext omegaContext = new OmegaContext(idGenerator);
- private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext);
+ private OmegaContext omegaContext;
+ private SagaStartAspect aspect;
@Before
public void setUp() throws Exception {
when(idGenerator.nextId()).thenReturn(globalTxId);
when(joinPoint.getSignature()).thenReturn(methodSignature);
-
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
- omegaContext.clear();
}
@Test
public void newGlobalTxIdInSagaStart() throws Throwable {
+ omegaContext = new OmegaContext(idGenerator);
+ aspect = new SagaStartAspect(sender, omegaContext);
aspect.advise(joinPoint, sagaStart);
TxEvent startedEvent = messages.get(0);
@@ -108,6 +108,8 @@ public class SagaStartAspectTest {
@Test
public void clearContextOnSagaStartError() throws Throwable {
+ omegaContext = new OmegaContext(idGenerator);
+ aspect = new SagaStartAspect(sender, omegaContext);
RuntimeException oops = new RuntimeException("oops");
when(joinPoint.proceed()).thenThrow(oops);
@@ -138,6 +140,40 @@ public class SagaStartAspectTest {
assertThat(omegaContext.localTxId(), is(nullValue()));
}
+ @Test
+ public void clearContextOnSagaStartErrorWithAkka() throws Throwable {
+ omegaContext = new OmegaContext(idGenerator,true);
+ aspect = new SagaStartAspect(sender, omegaContext);
+ RuntimeException oops = new RuntimeException("oops");
+
+ when(joinPoint.proceed()).thenThrow(oops);
+
+ try {
+ aspect.advise(joinPoint, sagaStart);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException e) {
+ assertThat(e, is(oops));
+ }
+
+ assertThat(messages.size(), is(2));
+ TxEvent event = messages.get(0);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(globalTxId));
+ assertThat(event.parentTxId(), is(nullValue()));
+ assertThat(event.type(), is(EventType.SagaStartedEvent));
+
+ event = messages.get(1);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(globalTxId));
+ assertThat(event.parentTxId(), is(nullValue()));
+ assertThat(event.type(), is(EventType.SagaAbortedEvent));
+
+ assertThat(omegaContext.globalTxId(), is(nullValue()));
+ assertThat(omegaContext.localTxId(), is(nullValue()));
+ }
+
private String doNothing() {
return "doNothing";
}