You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/28 09:05:51 UTC
[servicecomb-pack] 01/02: SCB-1386 Introduce sendingSagaEnd
attribute to let user choise when to send SagaEnd event
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch SCB-1386
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit f40ef0ae24e0aef927a3c028788dad23b03a0798
Author: Willem Jiang <wi...@gmail.com>
AuthorDate: Sun Jul 28 16:48:28 2019 +0800
SCB-1386 Introduce sendingSagaEnd attribute to let user choise when to send SagaEnd event
---
.../pack/omega/context/annotations/SagaStart.java | 7 +++++
.../omega/transaction/AbstractRecoveryPolicy.java | 13 +++++++--
.../omega/transaction/CompensableInterceptor.java | 4 +++
.../transaction/SagaStartAnnotationProcessor.java | 1 +
.../omega/transaction/annotations/Compensable.java | 7 +++++
.../SagaStartAnnotationProcessorWrapper.java | 8 ++++--
.../omega/transaction/TransactionAspectTest.java | 33 +++++++++++++++++++++-
7 files changed, 68 insertions(+), 5 deletions(-)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
index 2a66a84..7517bd4 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
@@ -37,4 +37,11 @@ public @interface SagaStart {
* @return
*/
int timeout() default 0;
+
+ /**
+ * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
+ * Default value is true, which means Omega sends out the SagaEnd event to Alpha once the annotated method is finished.
+ * Value is false, which means Omega never sends out the SagaEnd event to Alpha once the annotated method is finished.
+ */
+ boolean sendingSagaEnd() default true;
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index 8526581..a97f118 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -32,11 +32,20 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable,
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
throws Throwable {
+ Object result;
if(compensable.timeout()>0){
RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
- return wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+ result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
}else{
- return this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+ result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
}
+ if (compensable.sendingSagaEnd()) {
+ // Just send out the SagaEnd event
+ // TODO we may also invoke the callback here to release some resources
+ interceptor.sendSagaEndEvent();
+ }
+ return result;
}
+
+
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 08ad7f7..1f4a119 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -45,4 +45,8 @@ public class CompensableInterceptor implements EventAwareInterceptor {
sender.send(
new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
}
+
+ public void sendSagaEndEvent() {
+ sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
+ }
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
index 767171b..de9aa3f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
@@ -42,6 +42,7 @@ public class SagaStartAnnotationProcessor {
public void postIntercept(String parentTxId) {
AlphaResponse response = sender
.send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ //TODO we may know if the transaction is aborted from fsm alpha backend
if (response.aborted()) {
throw new OmegaException("transaction " + parentTxId + " is aborted");
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index 0e24029..0272d29 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -72,4 +72,11 @@ public @interface Compensable {
* @return the timeout value
*/
int timeout() default 0;
+
+ /**
+ * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
+ * Default value is false, which means Omega never send out the SagaEnd event to Alpha once the annotated method is finished.
+ * value is true, means this method is last compensable method need to be called, Omega will send SagaEnd event to Alpha once the method is finished.
+ */
+ boolean sendingSagaEnd() default false;
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
index 2837c7f..f71e507 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
@@ -45,8 +45,12 @@ public class SagaStartAnnotationProcessorWrapper {
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
try {
Object result = joinPoint.proceed();
- sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
- LOG.debug("Transaction with context {} has finished.", context);
+ if (sagaStart.sendingSagaEnd()) {
+ sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
+ LOG.debug("Transaction with context {} has finished.", context);
+ } else {
+ LOG.debug("Transaction with context {} is not finished in the SagaStarted annotated method.", context);
+ }
return result;
} catch (Throwable throwable) {
// We don't need to handle the OmegaException here
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index 4c5ad4c..3de4013 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -110,10 +111,40 @@ public class TransactionAspectTest {
}
@Test
+ public void sendingSageEndEvent() throws Throwable {
+ when(compensable.sendingSagaEnd()).thenReturn(true);
+ aspect.advise(joinPoint, compensable);
+ assertThat(messages.size(), is(3));
+
+ TxEvent startedEvent = messages.get(0);
+
+ assertThat(startedEvent.globalTxId(), is(globalTxId));
+ assertThat(startedEvent.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent.parentTxId(), is(localTxId));
+ assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent.retries(), is(0));
+ assertThat(startedEvent.retryMethod().isEmpty(), is(true));
+
+ TxEvent endedEvent = messages.get(1);
+
+ assertThat(endedEvent.globalTxId(), is(globalTxId));
+ assertThat(endedEvent.localTxId(), is(newLocalTxId));
+ assertThat(endedEvent.parentTxId(), is(localTxId));
+ assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+
+ TxEvent sagaEndEvent = messages.get(2);
+ assertThat(sagaEndEvent.globalTxId(), is(globalTxId));
+ assertNull(sagaEndEvent.parentTxId());
+ assertThat(sagaEndEvent.localTxId(), is(newLocalTxId));
+ assertThat(sagaEndEvent.type(), is(EventType.SagaEndedEvent));
+ }
+
+ @Test
public void setNewLocalTxIdCompensableWithTransactionContext() throws Throwable {
// setup the argument class
when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});
aspect.advise(joinPoint, compensable);
+ assertThat(messages.size(), is(2));
TxEvent startedEvent = messages.get(0);
assertThat(startedEvent.globalTxId(), is(transactionGlobalTxId));
@@ -137,7 +168,7 @@ public class TransactionAspectTest {
@Test
public void newLocalTxIdInCompensable() throws Throwable {
aspect.advise(joinPoint, compensable);
-
+ assertThat(messages.size(), is(2));
TxEvent startedEvent = messages.get(0);
assertThat(startedEvent.globalTxId(), is(globalTxId));