You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2020/01/14 11:33:50 UTC
[servicecomb-pack] 01/08: SCB-1696 Add attribute reverseRetries and
reverseTimeout to @Compensable
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 4e6232d49d1ca0ce7d55732bc73f921801d9e34c
Author: Lei Zhang <zh...@apache.org>
AuthorDate: Sat Jan 11 01:20:43 2020 +0800
SCB-1696 Add attribute reverseRetries and reverseTimeout to @Compensable
---
.../pack/alpha/benchmark/SagaEventBenchmark.java | 16 ++---
.../pack/alpha/core/fsm/event/TxStartedEvent.java | 42 ++++++++++++
.../pack/alpha/server/GrpcTxEventEndpointImpl.java | 2 +-
.../alpha/server/fsm/GrpcSagaEventService.java | 5 +-
.../pack/alpha/server/AlphaIntegrationTest.java | 20 ++++--
.../server/AlphaIntegrationWithRandomPortTest.java | 20 ++++--
.../alpha/server/fsm/OmegaEventSagaSimulator.java | 30 +++++----
.../grpc/saga/GrpcSagaClientMessageSender.java | 4 +-
.../grpc/saga/RetryableMessageSenderTest.java | 3 +-
.../grpc/saga/SagaLoadBalancedSenderTest.java | 3 +-
.../grpc/saga/SagaLoadBalancedSenderTestBase.java | 5 +-
.../spring/TransactionInterceptionTest.java | 77 +++++++++++-----------
.../omega/transaction/CompensableInterceptor.java | 4 +-
.../pack/omega/transaction/DefaultRecovery.java | 3 +-
.../omega/transaction/EventAwareInterceptor.java | 5 +-
.../pack/omega/transaction/ForwardRecovery.java | 2 +-
.../transaction/NoOpEventAwareInterceptor.java | 4 +-
.../omega/transaction/RecoveryPolicyFactory.java | 3 +-
.../pack/omega/transaction/SagaAbortedEvent.java | 2 +-
.../pack/omega/transaction/SagaEndedEvent.java | 2 +-
.../pack/omega/transaction/SagaStartedEvent.java | 2 +-
.../pack/omega/transaction/TxAbortedEvent.java | 2 +-
.../transaction/TxCompensateAckFailedEvent.java | 2 +-
.../transaction/TxCompensateAckSucceedEvent.java | 2 +-
.../pack/omega/transaction/TxCompensatedEvent.java | 2 +-
.../pack/omega/transaction/TxEndedEvent.java | 2 +-
.../pack/omega/transaction/TxEvent.java | 28 +++++++-
.../pack/omega/transaction/TxStartedEvent.java | 4 +-
.../omega/transaction/annotations/Compensable.java | 17 +++++
.../transaction/CompensableInterceptorTest.java | 8 ++-
.../omega/transaction/TransactionAspectTest.java | 68 +++++++++----------
.../src/main/proto/GrpcTxEvent.proto | 9 ++-
32 files changed, 259 insertions(+), 139 deletions(-)
diff --git a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
index c36ce04..ebb1856 100644
--- a/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
+++ b/alpha/alpha-benchmark/src/main/java/org/apache/servicecomb/pack/alpha/benchmark/SagaEventBenchmark.java
@@ -193,27 +193,27 @@ public class SagaEventBenchmark {
List<TxEvent> sagaEvents = new ArrayList<>();
sagaEvents.add(
new TxEvent(EventType.SagaStartedEvent, globalTxId, globalTxId, globalTxId, "", 0, null,
- 0));
+ 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_1, globalTxId, "service a", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_1, globalTxId, "service a", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_2, globalTxId, "service b", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_2, globalTxId, "service b", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId_3, globalTxId, "service c", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
new TxEvent(EventType.TxEndedEvent, globalTxId, localTxId_3, globalTxId, "service c", 0,
- null, 0));
+ null, 0, 0, 0, 0));
sagaEvents.add(
- new TxEvent(EventType.SagaEndedEvent, globalTxId, globalTxId, globalTxId, "", 0, null, 0));
+ new TxEvent(EventType.SagaEndedEvent, globalTxId, globalTxId, globalTxId, "", 0, null, 0, 0, 0, 0));
return sagaEvents;
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
index 96630d1..b2f13ac 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/event/TxStartedEvent.java
@@ -25,6 +25,9 @@ public class TxStartedEvent extends TxEvent {
private byte[] payloads;
private String retryMethod;
private int forwardRetries;
+ private int forwardTimeout;
+ private int reverseRetries;
+ private int reverseTimeout;
public String getCompensationMethod() {
return compensationMethod;
@@ -58,6 +61,30 @@ public class TxStartedEvent extends TxEvent {
this.forwardRetries = forwardRetries;
}
+ public int getForwardTimeout() {
+ return forwardTimeout;
+ }
+
+ public void setForwardTimeout(int forwardTimeout) {
+ this.forwardTimeout = forwardTimeout;
+ }
+
+ public int getReverseRetries() {
+ return reverseRetries;
+ }
+
+ public void setReverseRetries(int reverseRetries) {
+ this.reverseRetries = reverseRetries;
+ }
+
+ public int getReverseTimeout() {
+ return reverseTimeout;
+ }
+
+ public void setReverseTimeout(int reverseTimeout) {
+ this.reverseTimeout = reverseTimeout;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -115,6 +142,21 @@ public class TxStartedEvent extends TxEvent {
return this;
}
+ public Builder forwardTimeout(int forwardTimeout) {
+ txStartedEvent.setForwardTimeout(forwardTimeout);
+ return this;
+ }
+
+ public Builder reverseRetries(int reverseRetries) {
+ txStartedEvent.setReverseRetries(reverseRetries);
+ return this;
+ }
+
+ public Builder reverseTimeout(int reverseTimeout) {
+ txStartedEvent.setReverseTimeout(reverseTimeout);
+ return this;
+ }
+
public Builder createTime(Date createTime){
txStartedEvent.setCreateTime(createTime);
return this;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
index 5cf1c4a..8263de5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcTxEventEndpointImpl.java
@@ -106,7 +106,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
message.getType(),
message.getCompensationMethod(),
- message.getForwardTimeout(),
+ message.getTimeout(),
message.getRetryMethod(),
message.getForwardRetries(),
message.getPayloads().toByteArray()
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 79dac46..49b5e4e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -108,7 +108,7 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.createTime(new Date())
- .timeout(message.getForwardTimeout()).build();
+ .timeout(message.getTimeout()).build();
} else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent.builder()
.serviceName(message.getServiceName())
@@ -138,6 +138,9 @@ public class GrpcSagaEventService extends TxEventServiceImplBase {
.compensationMethod(message.getCompensationMethod())
.retryMethod(message.getRetryMethod())
.forwardRetries(message.getForwardRetries())
+ .forwardTimeout(message.getForwardTimeout())
+ .reverseRetries(message.getReverseRetries())
+ .reverseTimeout(message.getReverseTimeout())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.TxEndedEvent.name())) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
index fabba6c..523783f 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationTest.java
@@ -515,12 +515,12 @@ public class AlphaIntegrationTest {
private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout,
- "", 0);
+ "", 0, 0, 0, 0);
}
- private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) {
+ private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int forwardRetries) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0,
- retryMethod, retries);
+ retryMethod, forwardRetries, 0, 0, 0);
}
private GrpcTxEvent someGrpcEvent(EventType type) {
@@ -537,12 +537,12 @@ public class AlphaIntegrationTest {
private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
String compensationMethod) {
- return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0);
+ return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0, 0, 0, 0);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -553,7 +553,10 @@ public class AlphaIntegrationTest {
String compensationMethod,
int timeout,
String retryMethod,
- int forwardRetries) {
+ int forwardRetries,
+ int forwardTimeout,
+ int reverseRetries,
+ int reverseTimeout) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -564,9 +567,12 @@ public class AlphaIntegrationTest {
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
- .setForwardTimeout(timeout)
+ .setTimeout(timeout)
+ .setForwardTimeout(forwardTimeout)
+ .setReverseTimeout(reverseTimeout)
.setRetryMethod(retryMethod)
.setForwardRetries(forwardRetries)
+ .setReverseRetries(reverseRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index d4c44b6..f6a026b 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -497,12 +497,12 @@ public class AlphaIntegrationWithRandomPortTest {
private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout,
- "", 0);
+ "", 0, 0, 0, 0);
}
- private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) {
+ private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int forwardRetries) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0,
- retryMethod, retries);
+ retryMethod, forwardRetries, 0, 0, 0);
}
private GrpcTxEvent someGrpcEvent(EventType type) {
@@ -519,12 +519,12 @@ public class AlphaIntegrationWithRandomPortTest {
private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) {
return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
String compensationMethod) {
- return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0);
+ return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0, 0, 0, 0);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -535,7 +535,10 @@ public class AlphaIntegrationWithRandomPortTest {
String compensationMethod,
int timeout,
String retryMethod,
- int forwardRetries) {
+ int forwardRetries,
+ int forwardTimeout,
+ int reverseRetries,
+ int reverseTimeout) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -546,9 +549,12 @@ public class AlphaIntegrationWithRandomPortTest {
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
- .setForwardTimeout(timeout)
+ .setTimeout(timeout)
+ .setForwardTimeout(forwardTimeout)
+ .setReverseTimeout(reverseTimeout)
.setRetryMethod(retryMethod)
.setForwardRetries(forwardRetries)
+ .setReverseRetries(reverseRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
index 424dd66..34348b5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java
@@ -185,66 +185,66 @@ public class OmegaEventSagaSimulator {
private GrpcTxEvent sagaStartedEvent(String globalTxId) {
return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent sagaStartedEvent(String globalTxId, int timeout) {
return eventOf(EventType.SagaStartedEvent, globalTxId, globalTxId,
null, new byte[0], "", timeout, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent sagaEndedEvent(String globalTxId) {
return eventOf(EventType.SagaEndedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent sagaAbortedEvent(String globalTxId) {
return eventOf(EventType.SagaAbortedEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent sagaTimeoutEvent(String globalTxId) {
return eventOf(EventType.SagaTimeoutEvent, globalTxId, globalTxId,
null, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent txStartedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxStartedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent txEndedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxEndedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent txAbortedEvent(String globalTxId,
String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
return eventOf(EventType.TxAbortedEvent, globalTxId, localTxId,
parentTxId, payloads, compensationMethod, 0, "",
- 0);
+ 0, 0, 0, 0);
}
public GrpcTxEvent txCompensatedEvent(String globalTxId,
String localTxId, String parentTxId) {
return eventOf(EventType.TxCompensatedEvent, globalTxId, localTxId,
parentTxId, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
public GrpcTxEvent txCompensateAckSucceedEvent(String globalTxId,
String localTxId, String parentTxId) {
return eventOf(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId,
parentTxId, new byte[0], "", 0, "",
- 0);
+ 0, 0, 0, 0);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -255,7 +255,10 @@ public class OmegaEventSagaSimulator {
String compensationMethod,
int timeout,
String retryMethod,
- int forwardRetries) {
+ int forwardRetries,
+ int forwardTimeout,
+ int reverseRetries,
+ int reverseTimeout) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -266,9 +269,12 @@ public class OmegaEventSagaSimulator {
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
- .setForwardTimeout(timeout)
+ .setTimeout(timeout)
+ .setForwardTimeout(forwardTimeout)
+ .setReverseTimeout(reverseTimeout)
.setRetryMethod(retryMethod)
.setForwardRetries(forwardRetries)
+ .setReverseRetries(reverseRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
index 60c9bf2..d628832 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/GrpcSagaClientMessageSender.java
@@ -109,10 +109,12 @@ public class GrpcSagaClientMessageSender implements SagaMessageSender {
.setLocalTxId(event.localTxId())
.setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
.setType(event.type().name())
- .setForwardTimeout(event.timeout())
+ .setTimeout(event.timeout())
+ .setForwardTimeout(event.forwardTimeout())
.setCompensationMethod(event.compensationMethod())
.setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
.setForwardRetries(event.forwardRetries())
+ .setReverseRetries(event.reverseRetries())
.setPayloads(payloads);
return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
index c16c143..cffd839 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/RetryableMessageSenderTest.java
@@ -44,7 +44,8 @@ public class RetryableMessageSenderTest {
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
- private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
+ private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x",
+ 0, null, 0, 0, 0, 0);
@Test
public void sendEventWhenSenderIsAvailable() {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
index 9d623ff..d8bb453 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTest.java
@@ -304,7 +304,8 @@ public class SagaLoadBalancedSenderTest extends SagaLoadBalancedSenderTestBase {
public void forwardSendResult() {
assertThat(messageSender.send(event).aborted(), is(false));
- TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0, "blah");
+ TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0,
+ 0, 0, 0, "blah");
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
index f2f05d6..32a3493 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/pack/omega/connector/grpc/saga/SagaLoadBalancedSenderTestBase.java
@@ -74,7 +74,7 @@ public abstract class SagaLoadBalancedSenderTestBase {
protected final String compensationMethod = getClass().getCanonicalName();
protected final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
- compensationMethod, 0, "", 0, "blah");
+ compensationMethod, 0, "", 0, 0, 0, 0, "blah");
protected final String serviceName = uniquify("serviceName");
@@ -174,6 +174,9 @@ public abstract class SagaLoadBalancedSenderTestBase {
request.getForwardTimeout(),
request.getRetryMethod(),
request.getForwardRetries(),
+ request.getForwardTimeout(),
+ request.getReverseRetries(),
+ request.getReverseTimeout(),
new String(request.getPayloads().toByteArray())));
sleep();
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
index 81d79a1..39d2f56 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/pack/omega/transaction/spring/TransactionInterceptionTest.java
@@ -139,7 +139,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
- user).toString(),
+ 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -161,7 +161,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
- illegalUser).toString(),
+ 0, 0, 0, illegalUser).toString(),
new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -183,10 +183,11 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+ 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0,
- anotherUser).toString(),
+ 0, 0, 0, anotherUser).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -195,32 +196,32 @@ public class TransactionInterceptionTest {
);
}
- @Test
- public void retryTillSuccess() {
- try {
- userService.add(user, 1);
- } catch (Exception e) {
- fail("unexpected exception throw: " + e);
- }
-
- assertThat(messages.size(), is(3));
-
- assertThat(messages.get(0),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
- .toString()));
-
- assertThat(messages.get(1),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
- .toString()));
- assertThat(messages.get(2),
- is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));
-
- assertThat(userRepository.count(), is(1L));
- Iterable<User> users = userRepository.findAll();
- for(User user: users ) {
- assertThat(user, is(this.user));
- }
- }
+// @Test
+// public void retryTillSuccess() {
+// try {
+// userService.add(user, 1);
+// } catch (Exception e) {
+// fail("unexpected exception throw: " + e);
+// }
+//
+// assertThat(messages.size(), is(3));
+//
+// assertThat(messages.get(0),
+// is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
+// .toString()));
+//
+// assertThat(messages.get(1),
+// is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
+// .toString()));
+// assertThat(messages.get(2),
+// is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));
+//
+// assertThat(userRepository.count(), is(1L));
+// Iterable<User> users = userRepository.findAll();
+// for(User user: users ) {
+// assertThat(user, is(this.user));
+// }
+// }
@Test
public void retryReachesMaximumThenThrowsException() {
@@ -233,11 +234,11 @@ public class TransactionInterceptionTest {
assertThat(messages.size(), is(3));
assertThat(messages.get(0),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 3)
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, 0, 0, 0, user, 3)
.toString()));
assertThat(messages.get(1),
- is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 3)
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, 0, 0, 0, user, 3)
.toString()));
String abortedEvent2 = messages.get(2);
@@ -267,9 +268,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -307,9 +308,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, 0, 0, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -334,7 +335,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -352,7 +353,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, 0, 0, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
index d90d63c..cc639c7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptor.java
@@ -30,9 +30,9 @@ public class CompensableInterceptor implements EventAwareInterceptor {
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
- int forwardRetries, Object... message) {
+ int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... message) {
return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
- timeout, retriesMethod, forwardRetries, message));
+ timeout, retriesMethod, forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, message));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
index 19e7825..54f5a87 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/DefaultRecovery.java
@@ -57,7 +57,8 @@ public class DefaultRecovery extends AbstractRecoveryPolicy {
String retrySignature = (forwardRetries != 0 || compensationSignature.isEmpty()) ? method.toString() : "";
AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.forwardTimeout(),
- retrySignature, forwardRetries, joinPoint.getArgs());
+ retrySignature, forwardRetries, compensable.forwardTimeout(),
+ compensable.reverseRetries(), compensable.reverseTimeout(), joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
context.setLocalTxId(parentTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
index 3b4036c..f077b38 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/EventAwareInterceptor.java
@@ -20,8 +20,9 @@ package org.apache.servicecomb.pack.omega.transaction;
public interface EventAwareInterceptor {
AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout,
- String retriesMethod,
- int forwardRetries, Object... message);
+ String retriesMethod, int forwardRetries, int forwardTimeout, int reverseRetries,
+ int reverseTimeout,
+ Object... message);
void postIntercept(String parentTxId, String compensationMethod);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/ForwardRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/ForwardRecovery.java
index 21fde8e..bb1e7f7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/ForwardRecovery.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/ForwardRecovery.java
@@ -52,7 +52,7 @@ public class ForwardRecovery extends DefaultRecovery {
throw throwable;
}
- remains = remains == -1 ? -1 : remains - 1;
+ remains--;
if (remains == 0) {
LOG.error(
"Forward Retried sub tx failed maximum times, global tx id: {}, local tx id: {}, method: {}, retried times: {}",
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
index a6f8447..07e2eab 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/NoOpEventAwareInterceptor.java
@@ -23,8 +23,8 @@ public class NoOpEventAwareInterceptor implements EventAwareInterceptor {
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout,
- String retriesMethod,
- int forwardRetries, Object... message) {
+ String retriesMethod, int forwardRetries, int forwardTimeout, int reverseRetries,
+ int reverseTimeout, Object... message) {
return new AlphaResponse(false);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyFactory.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyFactory.java
index f373067..3a33bce 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyFactory.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/RecoveryPolicyFactory.java
@@ -25,9 +25,8 @@ public class RecoveryPolicyFactory {
/**
* If retries == 0, use the default recovery to execute only once.
* If retries > 0, it will use the forward recovery and retry the given times at most.
- * If retries == -1, it will use the forward recovery and retry forever until interrupted.
*/
static RecoveryPolicy getRecoveryPolicy(int forwardRetries) {
- return forwardRetries != 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
+ return forwardRetries > 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
index fcc77e6..36d57a2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaAbortedEvent.java
@@ -27,7 +27,7 @@ public class SagaAbortedEvent extends TxEvent {
public SagaAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(EventType.SagaAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
- stackTrace(throwable));
+ 0, 0, 0, stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
index aa2bb3b..f2f7553 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class SagaEndedEvent extends TxEvent {
SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0);
+ super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
index 66fba1a..edbfb32 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class SagaStartedEvent extends TxEvent {
public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
// use "" instead of null as compensationMethod requires not null in sql
- super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0);
+ super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
index c6dea5a..c6bc5a7 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxAbortedEvent.java
@@ -28,7 +28,7 @@ public class TxAbortedEvent extends TxEvent {
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
- stackTrace(throwable));
+ 0, 0, 0, stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
index 0446eb3..0c4adef 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckFailedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class TxCompensateAckFailedEvent extends TxEvent {
public TxCompensateAckFailedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0);
+ super(EventType.TxCompensateAckFailedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0,0 ,0 ,0);
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
index d5e2edd..3191999 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensateAckSucceedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class TxCompensateAckSucceedEvent extends TxEvent {
public TxCompensateAckSucceedEvent(String globalTxId, String localTxId, String parentTxId) {
- super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0);
+ super(EventType.TxCompensateAckSucceedEvent, globalTxId, localTxId, parentTxId, "", 0, "", 0, 0, 0, 0);
}
}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
index e4b1a17..3355eed 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
+ super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
index 8618e7f..7b55c51 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.pack.common.EventType;
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
+ super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0, 0, 0, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
index 0bc5c26..c860c80 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxEvent.java
@@ -34,9 +34,13 @@ public class TxEvent {
private final String retryMethod;
private final int forwardRetries;
+ private final int forwardTimeout;
+ private final int reverseRetries;
+ private final int reverseTimeout;
+
public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, String retryMethod, int forwardRetries, Object... payloads) {
+ int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... payloads) {
this.timestamp = System.currentTimeMillis();
this.type = type;
this.globalTxId = globalTxId;
@@ -46,6 +50,9 @@ public class TxEvent {
this.timeout = timeout;
this.retryMethod = retryMethod;
this.forwardRetries = forwardRetries;
+ this.forwardTimeout = forwardTimeout;
+ this.reverseRetries = reverseRetries;
+ this.reverseTimeout = reverseTimeout;
this.payloads = payloads;
}
@@ -89,6 +96,18 @@ public class TxEvent {
return forwardRetries;
}
+ public int forwardTimeout() {
+ return forwardTimeout;
+ }
+
+ public int reverseRetries() {
+ return reverseRetries;
+ }
+
+ public int reverseTimeout() {
+ return reverseTimeout;
+ }
+
@Override
public String toString() {
return type.name() + "{" +
@@ -96,9 +115,12 @@ public class TxEvent {
", localTxId='" + localTxId + '\'' +
", parentTxId='" + parentTxId + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
- ", timeout=" + timeout +
+ ", timeout=" + timeout + '\'' +
", retryMethod='" + retryMethod + '\'' +
- ", forwardRetries=" + forwardRetries +
+ ", forwardRetries=" + forwardRetries + '\'' +
+ ", forwardTimeout=" + forwardTimeout + '\'' +
+ ", reverseRetries=" + reverseRetries + '\'' +
+ ", reverseTimeout=" + reverseTimeout + '\'' +
", payloads=" + Arrays.toString(payloads) +
'}';
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
index 0851f65..59112bf 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/TxStartedEvent.java
@@ -22,8 +22,8 @@ import org.apache.servicecomb.pack.common.EventType;
public class TxStartedEvent extends TxEvent {
public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, String retryMethod, int forwardRetries, Object... payloads) {
+ int timeout, String retryMethod, int forwardRetries, int forwardTimeout, int reverseRetries, int reverseTimeout, Object... payloads) {
super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retryMethod,
- forwardRetries, payloads);
+ forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, payloads);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
index 8ed5081..f2f547c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/annotations/Compensable.java
@@ -50,6 +50,16 @@ public @interface Compensable {
int forwardRetries() default 0;
/**
+ * The retires number of the reverse compensable method.
+ * Default value is 0, which means never retry it
+ * value > 0, which means the retry number
+ * value < 0, an IllegalArgumentException will be thrown
+ *
+ * @return the reverse retries number
+ */
+ int reverseRetries() default 0;
+
+ /**
* Compensation method name.<br>
* A compensation method should satisfy below requirements:
* <ol>
@@ -73,4 +83,11 @@ public @interface Compensable {
*/
int forwardTimeout() default 0;
+ /**
+ * <code>@Compensable</code> reverse compensable method timeout, in seconds. <br>
+ * Default value is 0, which means never timeout.
+ *
+ * @return the reverse timeout value
+ */
+ int reverseTimeout() default 0;
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
index fa3385c..941c4ab 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/CompensableInterceptorTest.java
@@ -92,7 +92,10 @@ public class CompensableInterceptorTest {
@Test
public void sendsTxStartedEventBefore() throws Exception {
int forwardRetries = new Random().nextInt();
- interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, forwardRetries, message);
+ int forwardTimeout = new Random().nextInt();
+ int reverseRetries = new Random().nextInt();
+ int reverseTimeout = new Random().nextInt();
+ interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, forwardRetries, forwardTimeout, reverseRetries, reverseTimeout, message);
TxEvent event = messages.get(0);
@@ -100,6 +103,9 @@ public class CompensableInterceptorTest {
assertThat(event.localTxId(), is(localTxId));
assertThat(event.parentTxId(), is(parentTxId));
assertThat(event.forwardRetries(), is(forwardRetries));
+ assertThat(event.forwardTimeout(), is(forwardTimeout));
+ assertThat(event.reverseRetries(), is(reverseRetries));
+ assertThat(event.reverseTimeout(), is(reverseTimeout));
assertThat(event.retryMethod(), is(retryMethod));
assertThat(event.type(), is(EventType.TxStartedEvent));
assertThat(event.compensationMethod(), is(compensationMethod));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
index 9de70a5..5f38282 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/pack/omega/transaction/TransactionAspectTest.java
@@ -356,40 +356,40 @@ public class TransactionAspectTest {
assertThat(omegaContext.localTxId(), is(localTxId));
}
- @Test
- public void keepRetryingTillSuccess() throws Throwable {
- RuntimeException oops = new RuntimeException("oops");
- when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
- when(compensable.forwardRetries()).thenReturn(-1);
-
- aspect.advise(joinPoint, compensable);
-
- assertThat(messages.size(), is(4));
-
- TxEvent startedEvent1 = messages.get(0);
- assertThat(startedEvent1.globalTxId(), is(globalTxId));
- assertThat(startedEvent1.localTxId(), is(newLocalTxId));
- assertThat(startedEvent1.parentTxId(), is(localTxId));
- assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
- assertThat(startedEvent1.forwardRetries(), is(-1));
- assertThat(startedEvent1.retryMethod(),
- is(this.getClass().getDeclaredMethod("doNothing").toString()));
-
- TxEvent startedEvent2 = messages.get(1);
- assertThat(startedEvent2.localTxId(), is(newLocalTxId));
- assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
- assertThat(startedEvent2.forwardRetries(), is(-1));
-
- TxEvent startedEvent3 = messages.get(2);
- assertThat(startedEvent3.localTxId(), is(newLocalTxId));
- assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
- assertThat(startedEvent3.forwardRetries(), is(-1));
-
- assertThat(messages.get(3).type(), is(EventType.TxEndedEvent));
-
- assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(localTxId));
- }
+// @Test
+// public void keepRetryingTillSuccess() throws Throwable {
+// RuntimeException oops = new RuntimeException("oops");
+// when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
+// when(compensable.forwardRetries()).thenReturn(-1);
+//
+// aspect.advise(joinPoint, compensable);
+//
+// assertThat(messages.size(), is(4));
+//
+// TxEvent startedEvent1 = messages.get(0);
+// assertThat(startedEvent1.globalTxId(), is(globalTxId));
+// assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+// assertThat(startedEvent1.parentTxId(), is(localTxId));
+// assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+// assertThat(startedEvent1.forwardRetries(), is(-1));
+// assertThat(startedEvent1.retryMethod(),
+// is(this.getClass().getDeclaredMethod("doNothing").toString()));
+//
+// TxEvent startedEvent2 = messages.get(1);
+// assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+// assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+// assertThat(startedEvent2.forwardRetries(), is(-1));
+//
+// TxEvent startedEvent3 = messages.get(2);
+// assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+// assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+// assertThat(startedEvent3.forwardRetries(), is(-1));
+//
+// assertThat(messages.get(3).type(), is(EventType.TxEndedEvent));
+//
+// assertThat(omegaContext.globalTxId(), is(globalTxId));
+// assertThat(omegaContext.localTxId(), is(localTxId));
+// }
private String doNothing() {
return "doNothing";
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 6d34d21..876ae66 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -42,9 +42,12 @@ message GrpcTxEvent {
bytes payloads = 7;
string serviceName = 8;
string instanceId = 9;
- int32 forwardTimeout = 10;
- int32 forwardRetries = 11;
- string retryMethod = 12;
+ int32 timeout = 10;
+ int32 forwardTimeout = 11;
+ int32 forwardRetries = 12;
+ int32 reverseRetries = 13;
+ int32 reverseTimeout = 14;
+ string retryMethod = 15;
}
message GrpcCompensateCommand {