You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/02 00:09:41 UTC

[servicecomb-pack] 04/06: SCB-1386 Introduced SagaEnd annotation, rervert the changes of Compensiable

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit b41760ceac6870996d68f1c88e73574e33e4e9d6
Author: Willem Jiang <wi...@gmail.com>
AuthorDate: Wed Jul 31 10:04:40 2019 +0800

    SCB-1386 Introduced SagaEnd annotation, rervert the changes of Compensiable
---
 .../pack/omega/context/annotations/SagaEnd.java    |  24 ++++
 omega/omega-transaction/pom.xml                    |   4 +
 .../omega/transaction/AbstractRecoveryPolicy.java  |   7 +-
 .../omega/transaction/CompensableInterceptor.java  |   5 +-
 .../pack/omega/transaction/SagaEndAspect.java      |  75 +++++++++++
 .../pack/omega/transaction/TransactionAspect.java  |   2 +
 .../omega/transaction/annotations/Compensable.java |   6 -
 .../pack/omega/transaction/SagaEndAspectTest.java  | 137 +++++++++++++++++++++
 .../omega/transaction/TransactionAspectTest.java   |  29 -----
 9 files changed, 244 insertions(+), 45 deletions(-)

diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java
new file mode 100644
index 0000000..b7309b9
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.omega.context.annotations;
+
+/**
+ * Indicates once the annotated method is finised, it will end a saga.
+ */
+public class SagaEnd {
+
+}
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index c96a7cc..ba1a0cc 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -50,6 +50,10 @@
       <groupId>javax.transaction</groupId>
       <artifactId>javax.transaction-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-core</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index a97f118..6e86d72 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -36,14 +36,9 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
     if(compensable.timeout()>0){
       RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
       result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
-    }else{
+    } else {
       result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
     }
-    if (compensable.sendingSagaEnd()) {
-      // Just send out the SagaEnd event
-      // TODO we may also invoke the callback here to release some resources
-      interceptor.sendSagaEndEvent();
-    }
     return result;
   }
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 1f4a119..108ebbc 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -45,8 +45,5 @@ public class CompensableInterceptor implements EventAwareInterceptor {
     sender.send(
         new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
   }
-  
-  public void sendSagaEndEvent() {
-    sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
-  }
+
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java
new file mode 100644
index 0000000..14e1041
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaEnd;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+
+@Aspect
+@Order(value = 100)
+public class SagaEndAspect {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OmegaContext context;
+  private final SagaMessageSender sender;
+
+  public SagaEndAspect(SagaMessageSender sender, OmegaContext context) {
+    this.sender = sender;
+    this.context = context;
+  }
+
+  @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaEnded * *(..)) && @annotation(sagaEnd)")
+  Object advise(ProceedingJoinPoint joinPoint, SagaEnd sagaEnd) throws Throwable {
+      Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+      try {
+        Object result = joinPoint.proceed();
+        sendSagaEndedEvent();
+        return result;
+      } catch (Throwable throwable) {
+        // Don't check the OmegaException here.
+        if (!(throwable instanceof OmegaException)) {
+          LOG.error("Transaction {} failed.", context.globalTxId());
+          sendSagaAbortedEvent(method.toString(), throwable);
+        }
+        throw throwable;
+      }
+      finally {
+        context.clear();
+      }
+  }
+
+  private void sendSagaEndedEvent() {
+    // TODO need to check the parentID setting
+    sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
+  }
+
+  private void sendSagaAbortedEvent(String methodName, Throwable throwable) {
+    // TODO need to check the parentID setting
+    sender.send(new SagaAbortedEvent(context.globalTxId(), context.localTxId(), null, methodName, throwable));
+  }
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
index 5729973..b79e565 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
@@ -29,8 +29,10 @@ import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
 
 @Aspect
+@Order(value=10)
 public class TransactionAspect extends TransactionContextHelper {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index 0272d29..9186639 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -73,10 +73,4 @@ public @interface Compensable {
    */
   int timeout() default 0;
 
-  /**
-   * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
-   * Default value is false, which means Omega never send out the SagaEnd event to Alpha once the annotated method is finished.
-   * value is true, means this method is last compensable method need to be called, Omega will send SagaEnd event to Alpha once the method is finished.
-   */
-  boolean sendingSagaEnd() default false;
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java
new file mode 100644
index 0000000..6a37f72
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.omega.context.IdGenerator;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaEnd;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SagaEndAspectTest {
+  private final List<TxEvent> messages = new ArrayList<>();
+  private final String globalTxId = UUID.randomUUID().toString();
+
+  private final SagaMessageSender sender = new SagaMessageSender() {
+    @Override
+    public void onConnected() {
+    }
+
+    @Override
+    public void onDisconnected() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public String target() {
+      return "UNKNOWN";
+    }
+
+    @Override
+    public AlphaResponse send(TxEvent event) {
+      messages.add(event);
+      return new AlphaResponse(false);
+    }
+  };
+  private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+  private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+  @SuppressWarnings("unchecked")
+  private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+  private final SagaEnd sagaEnd = Mockito.mock(SagaEnd.class);
+
+  private final OmegaContext omegaContext = Mockito.mock(OmegaContext.class);
+  private SagaEndAspect aspect;
+
+  @Before
+  public void setUp() throws Exception {
+    when(omegaContext.globalTxId()).thenReturn(globalTxId);
+    when(omegaContext.localTxId()).thenReturn(globalTxId);
+    when(joinPoint.getSignature()).thenReturn(methodSignature);
+    when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+  }
+
+  @Test
+  public void sagaEndWithoutError() throws Throwable {
+    aspect = new SagaEndAspect(sender, omegaContext);
+    aspect.advise(joinPoint, sagaEnd);
+    assertThat(messages.size(), is(1));
+    TxEvent endedEvent = messages.get(0);
+
+    assertThat(endedEvent.globalTxId(), is(globalTxId));
+    assertThat(endedEvent.localTxId(), is(globalTxId));
+    assertThat(endedEvent.parentTxId(), is(nullValue()));
+    assertThat(endedEvent.type(), is(EventType.SagaEndedEvent));
+
+    verify(omegaContext).clear();
+  }
+
+
+
+  @Test
+  public void sagaEndWithErrors() throws Throwable {
+
+    aspect = new SagaEndAspect(sender, omegaContext);
+    RuntimeException oops = new RuntimeException("oops");
+
+    when(joinPoint.proceed()).thenThrow(oops);
+
+    try {
+      aspect.advise(joinPoint, sagaEnd);
+      expectFailing(RuntimeException.class);
+    } catch (RuntimeException e) {
+      assertThat(e, is(oops));
+    }
+
+    assertThat(messages.size(), is(1));
+    TxEvent event = messages.get(0);
+
+    assertThat(event.globalTxId(), is(globalTxId));
+    assertThat(event.localTxId(), is(globalTxId));
+    assertThat(event.parentTxId(), is(nullValue()));
+    assertThat(event.type(), is(EventType.SagaAbortedEvent));
+
+    verify(omegaContext).clear();
+  }
+
+
+
+
+  private String doNothing() {
+    return "doNothing";
+  }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index 3de4013..3c60789 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -111,35 +111,6 @@ public class TransactionAspectTest {
   }
 
   @Test
-  public void sendingSageEndEvent() throws Throwable {
-    when(compensable.sendingSagaEnd()).thenReturn(true);
-    aspect.advise(joinPoint, compensable);
-    assertThat(messages.size(), is(3));
-
-    TxEvent startedEvent = messages.get(0);
-
-    assertThat(startedEvent.globalTxId(), is(globalTxId));
-    assertThat(startedEvent.localTxId(), is(newLocalTxId));
-    assertThat(startedEvent.parentTxId(), is(localTxId));
-    assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
-    assertThat(startedEvent.retries(), is(0));
-    assertThat(startedEvent.retryMethod().isEmpty(), is(true));
-
-    TxEvent endedEvent = messages.get(1);
-
-    assertThat(endedEvent.globalTxId(), is(globalTxId));
-    assertThat(endedEvent.localTxId(), is(newLocalTxId));
-    assertThat(endedEvent.parentTxId(), is(localTxId));
-    assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
-
-    TxEvent sagaEndEvent = messages.get(2);
-    assertThat(sagaEndEvent.globalTxId(), is(globalTxId));
-    assertNull(sagaEndEvent.parentTxId());
-    assertThat(sagaEndEvent.localTxId(), is(newLocalTxId));
-    assertThat(sagaEndEvent.type(), is(EventType.SagaEndedEvent));
-  }
-
-  @Test
   public void setNewLocalTxIdCompensableWithTransactionContext() throws Throwable {
     // setup the argument class
     when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});