You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/07/28 09:05:51 UTC

[servicecomb-pack] 01/02: SCB-1386 Introduce sendingSagaEnd attribute to let user choise when to send SagaEnd event

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-1386
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f40ef0ae24e0aef927a3c028788dad23b03a0798
Author: Willem Jiang <wi...@gmail.com>
AuthorDate: Sun Jul 28 16:48:28 2019 +0800

    SCB-1386 Introduce sendingSagaEnd attribute to let user choise  when to send SagaEnd event
---
 .../pack/omega/context/annotations/SagaStart.java  |  7 +++++
 .../omega/transaction/AbstractRecoveryPolicy.java  | 13 +++++++--
 .../omega/transaction/CompensableInterceptor.java  |  4 +++
 .../transaction/SagaStartAnnotationProcessor.java  |  1 +
 .../omega/transaction/annotations/Compensable.java |  7 +++++
 .../SagaStartAnnotationProcessorWrapper.java       |  8 ++++--
 .../omega/transaction/TransactionAspectTest.java   | 33 +++++++++++++++++++++-
 7 files changed, 68 insertions(+), 5 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
index 2a66a84..7517bd4 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaStart.java
@@ -37,4 +37,11 @@ public @interface SagaStart {
    * @return
    */
   int timeout() default 0;
+
+  /**
+   * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
+   * Default value is true, which means Omega sends out the SagaEnd event to Alpha once the annotated method is finished.
+   * Value is false, which means Omega never sends out the SagaEnd event to Alpha once the annotated method is finished.
+   */
+  boolean sendingSagaEnd() default true;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index 8526581..a97f118 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -32,11 +32,20 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
   public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable,
       CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
       throws Throwable {
+    Object result;
     if(compensable.timeout()>0){
       RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
-      return wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+      result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
     }else{
-      return this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
+      result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
     }
+    if (compensable.sendingSagaEnd()) {
+      // Just send out the SagaEnd event
+      // TODO we may also invoke the callback here to release some resources
+      interceptor.sendSagaEndEvent();
+    }
+    return result;
   }
+
+
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 08ad7f7..1f4a119 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -45,4 +45,8 @@ public class CompensableInterceptor implements EventAwareInterceptor {
     sender.send(
         new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
+  
+  public void sendSagaEndEvent() {
+    sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
index 767171b..de9aa3f 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartAnnotationProcessor.java
@@ -42,6 +42,7 @@ public class SagaStartAnnotationProcessor {
   public void postIntercept(String parentTxId) {
     AlphaResponse response = sender
         .send(new SagaEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+    //TODO we may know if the transaction is aborted from fsm alpha backend
     if (response.aborted()) {
       throw new OmegaException("transaction " + parentTxId + " is aborted");
     }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index 0e24029..0272d29 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -72,4 +72,11 @@ public @interface Compensable {
    * @return the timeout value
    */
   int timeout() default 0;
+
+  /**
+   * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
+   * Default value is false, which means Omega never send out the SagaEnd event to Alpha once the annotated method is finished.
+   * value is true, means this method is last compensable method need to be called, Omega will send SagaEnd event to Alpha once the method is finished.
+   */
+  boolean sendingSagaEnd() default false;
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
index 2837c7f..f71e507 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/wrapper/SagaStartAnnotationProcessorWrapper.java
@@ -45,8 +45,12 @@ public class SagaStartAnnotationProcessorWrapper {
     LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
     try {
       Object result = joinPoint.proceed();
-      sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
-      LOG.debug("Transaction with context {} has finished.", context);
+      if (sagaStart.sendingSagaEnd()) {
+        sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
+        LOG.debug("Transaction with context {} has finished.", context);
+      } else {
+        LOG.debug("Transaction with context {} is not finished in the SagaStarted annotated method.", context);
+      }
       return result;
     } catch (Throwable throwable) {
       // We don't need to handle the OmegaException here
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index 4c5ad4c..3de4013 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.pack.omega.transaction;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -110,10 +111,40 @@ public class TransactionAspectTest {
   }
 
   @Test
+  public void sendingSageEndEvent() throws Throwable {
+    when(compensable.sendingSagaEnd()).thenReturn(true);
+    aspect.advise(joinPoint, compensable);
+    assertThat(messages.size(), is(3));
+
+    TxEvent startedEvent = messages.get(0);
+
+    assertThat(startedEvent.globalTxId(), is(globalTxId));
+    assertThat(startedEvent.localTxId(), is(newLocalTxId));
+    assertThat(startedEvent.parentTxId(), is(localTxId));
+    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+    assertThat(startedEvent.retries(), is(0));
+    assertThat(startedEvent.retryMethod().isEmpty(), is(true));
+
+    TxEvent endedEvent = messages.get(1);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(newLocalTxId));
+    assertThat(endedEvent.parentTxId(), is(localTxId));
+    assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+
+    TxEvent sagaEndEvent = messages.get(2);
+    assertThat(sagaEndEvent.globalTxId(), is(globalTxId));
+    assertNull(sagaEndEvent.parentTxId());
+    assertThat(sagaEndEvent.localTxId(), is(newLocalTxId));
+    assertThat(sagaEndEvent.type(), is(EventType.SagaEndedEvent));
+  }
+
+  @Test
   public void setNewLocalTxIdCompensableWithTransactionContext() throws Throwable {
     // setup the argument class
     when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});
     aspect.advise(joinPoint, compensable);
+    assertThat(messages.size(), is(2));
     TxEvent startedEvent = messages.get(0);
 
     assertThat(startedEvent.globalTxId(), is(transactionGlobalTxId));
@@ -137,7 +168,7 @@ public class TransactionAspectTest {
   @Test
   public void newLocalTxIdInCompensable() throws Throwable {
     aspect.advise(joinPoint, compensable);
-
+    assertThat(messages.size(), is(2));
     TxEvent startedEvent = messages.get(0);
 
     assertThat(startedEvent.globalTxId(), is(globalTxId));