You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/02 00:09:41 UTC
[servicecomb-pack] 04/06: SCB-1386 Introduced SagaEnd annotation,
rervert the changes of Compensiable
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit b41760ceac6870996d68f1c88e73574e33e4e9d6
Author: Willem Jiang <wi...@gmail.com>
AuthorDate: Wed Jul 31 10:04:40 2019 +0800
SCB-1386 Introduced SagaEnd annotation, rervert the changes of Compensiable
---
.../pack/omega/context/annotations/SagaEnd.java | 24 ++++
omega/omega-transaction/pom.xml | 4 +
.../omega/transaction/AbstractRecoveryPolicy.java | 7 +-
.../omega/transaction/CompensableInterceptor.java | 5 +-
.../pack/omega/transaction/SagaEndAspect.java | 75 +++++++++++
.../pack/omega/transaction/TransactionAspect.java | 2 +
.../omega/transaction/annotations/Compensable.java | 6 -
.../pack/omega/transaction/SagaEndAspectTest.java | 137 +++++++++++++++++++++
.../omega/transaction/TransactionAspectTest.java | 29 -----
9 files changed, 244 insertions(+), 45 deletions(-)
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java
new file mode 100644
index 0000000..b7309b9
--- /dev/null
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/pack/omega/context/annotations/SagaEnd.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.omega.context.annotations;
+
+/**
+ * Indicates once the annotated method is finised, it will end a saga.
+ */
+public class SagaEnd {
+
+}
diff --git a/omega/omega-transaction/pom.xml b/omega/omega-transaction/pom.xml
index c96a7cc..ba1a0cc 100644
--- a/omega/omega-transaction/pom.xml
+++ b/omega/omega-transaction/pom.xml
@@ -50,6 +50,10 @@
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
index a97f118..6e86d72 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/AbstractRecoveryPolicy.java
@@ -36,14 +36,9 @@ public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {
if(compensable.timeout()>0){
RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
- }else{
+ } else {
result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
}
- if (compensable.sendingSagaEnd()) {
- // Just send out the SagaEnd event
- // TODO we may also invoke the callback here to release some resources
- interceptor.sendSagaEndEvent();
- }
return result;
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index 1f4a119..108ebbc 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -45,8 +45,5 @@ public class CompensableInterceptor implements EventAwareInterceptor {
sender.send(
new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
}
-
- public void sendSagaEndEvent() {
- sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
- }
+
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java
new file mode 100644
index 0000000..14e1041
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspect.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaEnd;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+
+@Aspect
+@Order(value = 100)
+public class SagaEndAspect {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final OmegaContext context;
+ private final SagaMessageSender sender;
+
+ public SagaEndAspect(SagaMessageSender sender, OmegaContext context) {
+ this.sender = sender;
+ this.context = context;
+ }
+
+ @Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaEnded * *(..)) && @annotation(sagaEnd)")
+ Object advise(ProceedingJoinPoint joinPoint, SagaEnd sagaEnd) throws Throwable {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ try {
+ Object result = joinPoint.proceed();
+ sendSagaEndedEvent();
+ return result;
+ } catch (Throwable throwable) {
+ // Don't check the OmegaException here.
+ if (!(throwable instanceof OmegaException)) {
+ LOG.error("Transaction {} failed.", context.globalTxId());
+ sendSagaAbortedEvent(method.toString(), throwable);
+ }
+ throw throwable;
+ }
+ finally {
+ context.clear();
+ }
+ }
+
+ private void sendSagaEndedEvent() {
+ // TODO need to check the parentID setting
+ sender.send(new SagaEndedEvent(context.globalTxId(), context.localTxId()));
+ }
+
+ private void sendSagaAbortedEvent(String methodName, Throwable throwable) {
+ // TODO need to check the parentID setting
+ sender.send(new SagaAbortedEvent(context.globalTxId(), context.localTxId(), null, methodName, throwable));
+ }
+
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
index 5729973..b79e565 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspect.java
@@ -29,8 +29,10 @@ import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
@Aspect
+@Order(value=10)
public class TransactionAspect extends TransactionContextHelper {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index 0272d29..9186639 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -73,10 +73,4 @@ public @interface Compensable {
*/
int timeout() default 0;
- /**
- * Sending out SagaEnd event to Alpha once the SagaStart annotated method is finished without any error.
- * Default value is false, which means Omega never send out the SagaEnd event to Alpha once the annotated method is finished.
- * value is true, means this method is last compensable method need to be called, Omega will send SagaEnd event to Alpha once the method is finished.
- */
- boolean sendingSagaEnd() default false;
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java
new file mode 100644
index 0000000..6a37f72
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/SagaEndAspectTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.omega.transaction;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.servicecomb.pack.common.EventType;
+import org.apache.servicecomb.pack.omega.context.IdGenerator;
+import org.apache.servicecomb.pack.omega.context.OmegaContext;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaEnd;
+import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SagaEndAspectTest {
+ private final List<TxEvent> messages = new ArrayList<>();
+ private final String globalTxId = UUID.randomUUID().toString();
+
+ private final SagaMessageSender sender = new SagaMessageSender() {
+ @Override
+ public void onConnected() {
+ }
+
+ @Override
+ public void onDisconnected() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String target() {
+ return "UNKNOWN";
+ }
+
+ @Override
+ public AlphaResponse send(TxEvent event) {
+ messages.add(event);
+ return new AlphaResponse(false);
+ }
+ };
+ private final ProceedingJoinPoint joinPoint = Mockito.mock(ProceedingJoinPoint.class);
+ private final MethodSignature methodSignature = Mockito.mock(MethodSignature.class);
+
+ @SuppressWarnings("unchecked")
+ private final IdGenerator<String> idGenerator = Mockito.mock(IdGenerator.class);
+ private final SagaEnd sagaEnd = Mockito.mock(SagaEnd.class);
+
+ private final OmegaContext omegaContext = Mockito.mock(OmegaContext.class);
+ private SagaEndAspect aspect;
+
+ @Before
+ public void setUp() throws Exception {
+ when(omegaContext.globalTxId()).thenReturn(globalTxId);
+ when(omegaContext.localTxId()).thenReturn(globalTxId);
+ when(joinPoint.getSignature()).thenReturn(methodSignature);
+ when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
+ }
+
+ @Test
+ public void sagaEndWithoutError() throws Throwable {
+ aspect = new SagaEndAspect(sender, omegaContext);
+ aspect.advise(joinPoint, sagaEnd);
+ assertThat(messages.size(), is(1));
+ TxEvent endedEvent = messages.get(0);
+
+ assertThat(endedEvent.globalTxId(), is(globalTxId));
+ assertThat(endedEvent.localTxId(), is(globalTxId));
+ assertThat(endedEvent.parentTxId(), is(nullValue()));
+ assertThat(endedEvent.type(), is(EventType.SagaEndedEvent));
+
+ verify(omegaContext).clear();
+ }
+
+
+
+ @Test
+ public void sagaEndWithErrors() throws Throwable {
+
+ aspect = new SagaEndAspect(sender, omegaContext);
+ RuntimeException oops = new RuntimeException("oops");
+
+ when(joinPoint.proceed()).thenThrow(oops);
+
+ try {
+ aspect.advise(joinPoint, sagaEnd);
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException e) {
+ assertThat(e, is(oops));
+ }
+
+ assertThat(messages.size(), is(1));
+ TxEvent event = messages.get(0);
+
+ assertThat(event.globalTxId(), is(globalTxId));
+ assertThat(event.localTxId(), is(globalTxId));
+ assertThat(event.parentTxId(), is(nullValue()));
+ assertThat(event.type(), is(EventType.SagaAbortedEvent));
+
+ verify(omegaContext).clear();
+ }
+
+
+
+
+ private String doNothing() {
+ return "doNothing";
+ }
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index 3de4013..3c60789 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -111,35 +111,6 @@ public class TransactionAspectTest {
}
@Test
- public void sendingSageEndEvent() throws Throwable {
- when(compensable.sendingSagaEnd()).thenReturn(true);
- aspect.advise(joinPoint, compensable);
- assertThat(messages.size(), is(3));
-
- TxEvent startedEvent = messages.get(0);
-
- assertThat(startedEvent.globalTxId(), is(globalTxId));
- assertThat(startedEvent.localTxId(), is(newLocalTxId));
- assertThat(startedEvent.parentTxId(), is(localTxId));
- assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
- assertThat(startedEvent.retries(), is(0));
- assertThat(startedEvent.retryMethod().isEmpty(), is(true));
-
- TxEvent endedEvent = messages.get(1);
-
- assertThat(endedEvent.globalTxId(), is(globalTxId));
- assertThat(endedEvent.localTxId(), is(newLocalTxId));
- assertThat(endedEvent.parentTxId(), is(localTxId));
- assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
-
- TxEvent sagaEndEvent = messages.get(2);
- assertThat(sagaEndEvent.globalTxId(), is(globalTxId));
- assertNull(sagaEndEvent.parentTxId());
- assertThat(sagaEndEvent.localTxId(), is(newLocalTxId));
- assertThat(sagaEndEvent.type(), is(EventType.SagaEndedEvent));
- }
-
- @Test
public void setNewLocalTxIdCompensableWithTransactionContext() throws Throwable {
// setup the argument class
when(joinPoint.getArgs()).thenReturn(new Object[]{transactionContextProperties});