You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:46 UTC
[incubator-servicecomb-saga] 02/07: SCB-224: omega support retries
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 0651b91725700b5c09d253d2374845acabf42b90
Author: zhang2014 <co...@gmail.com>
AuthorDate: Wed Jan 17 23:06:04 2018 +0800
SCB-224: omega support retries
---
.../connector/grpc/GrpcClientMessageSender.java | 5 ++++-
.../grpc/LoadBalancedClusterMessageSenderTest.java | 8 +++++---
.../connector/grpc/RetryableMessageSenderTest.java | 2 +-
.../spring/CompensableMethodCheckingCallback.java | 1 +
.../spring/TransactionInterceptionTest.java | 23 ++++++++++++----------
.../omega/transaction/CompensableInterceptor.java | 4 ++--
.../omega/transaction/EventAwareInterceptor.java | 4 ++--
.../transaction/SagaStartAnnotationProcessor.java | 4 ++--
.../saga/omega/transaction/SagaStartAspect.java | 2 +-
.../omega/transaction/TimeAwareInterceptor.java | 0
.../saga/omega/transaction/TransactionAspect.java | 11 +++++++----
.../saga/omega/transaction/TxEvent.java | 14 ++++++++++++-
.../saga/omega/transaction/TxStartedEvent.java | 4 ++--
.../omega/transaction/annotations/Compensable.java | 4 +++-
.../transaction/CompensableInterceptorTest.java | 8 ++++++--
.../SagaStartAnnotationProcessorTest.java | 4 ++--
.../transaction/TimeAwareInterceptorTest.java | 0
17 files changed, 64 insertions(+), 34 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 5712f57..cf53a0c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -65,7 +65,8 @@ public class GrpcClientMessageSender implements MessageSender {
this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
this.serializer = serializer;
- this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
+ this.compensateStreamObserver =
+ new GrpcCompensateStreamObserver(handler, errorHandlerFactory.apply(this), deserializer);
this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
}
@@ -103,6 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
.setType(event.type().name())
.setTimeout(event.timeout())
.setCompensationMethod(event.compensationMethod())
+ .setRetries(event.retries())
+ .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
.setPayloads(payloads);
return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 788cf96..d66b737 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -98,7 +98,7 @@ public class LoadBalancedClusterMessageSenderTest {
private final String parentTxId = uniquify("parentTxId");
private final String compensationMethod = getClass().getCanonicalName();
private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
- compensationMethod, 0, "blah");
+ compensationMethod, 0, null, 0, "blah");
private final String serviceName = uniquify("serviceName");
private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -189,7 +189,7 @@ public class LoadBalancedClusterMessageSenderTest {
await().atMost(3, SECONDS).until(() -> compensated.contains(globalTxId));
}
- @Test (timeout = 1000)
+ @Test(timeout = 1000)
public void stopSendingOnInterruption() throws Exception {
MessageSender underlying = Mockito.mock(MessageSender.class);
doThrow(RuntimeException.class).when(underlying).send(event);
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
public void forwardSendResult() {
assertThat(messageSender.send(event).aborted(), is(false));
- TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah");
+ TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
@@ -358,6 +358,8 @@ public class LoadBalancedClusterMessageSenderTest {
request.getParentTxId(),
request.getCompensationMethod(),
0,
+ null,
+ 0,
new String(request.getPayloads().toByteArray())));
sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 95bda85..9d0ebc4 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@ public class RetryableMessageSenderTest {
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
- private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0);
+ private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
@Test
public void sendEventWhenSenderIsAvailable() {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 6c0c333..268fad9 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -48,6 +48,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
try {
Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+ compensationContext.addCompensationContext(method, bean);
compensationContext.addCompensationContext(signature, bean);
LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
} catch (NoSuchMethodException e) {
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 2ec42a5..19d0942 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -106,11 +106,14 @@ public class TransactionInterceptionTest {
private String compensationMethod;
+ private String retriesMethod;
+
@Before
public void setUp() throws Exception {
when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(globalTxId);
+ retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
}
@@ -131,7 +134,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -152,7 +155,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -174,9 +177,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -196,9 +199,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -215,9 +218,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -237,7 +240,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -255,7 +258,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 53e5158..fcce034 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
return sender.send(new TxStartedEvent(
- context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message));
+ context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index bb2cca4..e20bea2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
public interface EventAwareInterceptor {
EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
return new AlphaResponse(false);
}
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
}
};
- AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message);
+ AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
void postIntercept(String parentTxId, String compensationMethod);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index b7afcf5..49dd8e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
try {
- return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 388f237..a2ee58c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
initializeOmegaContext();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout());
+ sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
new file mode 100644
index 0000000..e69de29
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 932b990..86cc840 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -49,12 +49,15 @@ public class TransactionAspect {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
- String signature = compensationMethodSignature(joinPoint, compensable, method);
+ Object[] args = joinPoint.getArgs();
+ int retries = compensable.retries();
+ String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
+ String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
String localTxId = context.localTxId();
context.newLocalTxId();
- AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
+ AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
context.setLocalTxId(localTxId);
@@ -65,11 +68,11 @@ public class TransactionAspect {
try {
Object result = joinPoint.proceed();
- interceptor.postIntercept(localTxId, signature);
+ interceptor.postIntercept(localTxId, compensationSignature);
return result;
} catch (Throwable throwable) {
- interceptor.onError(localTxId, signature, throwable);
+ interceptor.onError(localTxId, compensationSignature, throwable);
throw throwable;
} finally {
context.setLocalTxId(localTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 34be420..a5b5514 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,9 +31,11 @@ public class TxEvent {
private final String compensationMethod;
private final int timeout;
private final Object[] payloads;
+ private final String retriesMethod;
+ private final int retries;
public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, Object... payloads) {
+ int timeout, String retriesMethod, int retries, Object... payloads) {
this.timestamp = System.currentTimeMillis();
this.type = type;
this.localTxId = localTxId;
@@ -42,6 +44,8 @@ public class TxEvent {
this.payloads = payloads;
this.globalTxId = globalTxId;
this.timeout = timeout;
+ this.retriesMethod = retriesMethod;
+ this.retries = retries;
}
public long timestamp() {
@@ -76,6 +80,14 @@ public class TxEvent {
return timeout;
}
+ public String retriesMethod() {
+ return retriesMethod;
+ }
+
+ public int retries() {
+ return retries;
+ }
+
@Override
public String toString() {
return type.name() + "{" +
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 4732d95..cb2580e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -22,7 +22,7 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxStartedEvent extends TxEvent {
public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
- String compensationMethod, int timeout, Object... payloads) {
- super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads);
+ String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
+ super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index 11ba7c7..e9bf6a7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -36,8 +36,10 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
public @interface Compensable {
+ int retries() default 0;
+
/**
- * Compensation method name, should not be null.<br>
+ * Compensation method name.<br>
* A compensation method should satisfy below requirements:
* <ol>
* <li>has same parameter list as @Compensable method's</li>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 0ef9d4d..76ffc03 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,13 +18,13 @@
package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import org.apache.servicecomb.saga.common.EventType;
@@ -47,6 +47,7 @@ public class CompensableInterceptorTest {
};
private final String message = uniquify("message");
+ private final String retriesMethod = uniquify("retries");
private final String compensationMethod = getClass().getCanonicalName();
@SuppressWarnings("unchecked")
@@ -62,13 +63,16 @@ public class CompensableInterceptorTest {
@Test
public void sendsTxStartedEventBefore() throws Exception {
- interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
+ int retries = new Random().nextInt();
+ interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
TxEvent event = messages.get(0);
assertThat(event.globalTxId(), is(globalTxId));
assertThat(event.localTxId(), is(localTxId));
assertThat(event.parentTxId(), is(parentTxId));
+ assertThat(event.retries(), is(retries));
+ assertThat(event.retriesMethod(), is(retriesMethod));
assertThat(event.type(), is(EventType.TxStartedEvent));
assertThat(event.compensationMethod(), is(compensationMethod));
assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index cc84fc5..8c496f6 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest {
@Test
public void sendsSagaStartedEvent() {
- sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+ sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
TxEvent event = messages.get(0);
@@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest {
doThrow(exception).when(sender).send(any());
try {
- sagaStartAnnotationProcessor.preIntercept(null, null, 0);
+ sagaStartAnnotationProcessor.preIntercept(null, null, 0, null, 0);
expectFailing(TransactionalException.class);
} catch (TransactionalException e) {
assertThat(e.getMessage(), is("exception"));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
new file mode 100644
index 0000000..e69de29
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.