You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/04/28 09:13:50 UTC
[incubator-servicecomb-saga] 06/07: SCB-224 retry synchronously
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 7e01a4f3ca696716ca7beaedc9032e7ca6383fff
Author: Eric Lee <da...@huawei.com>
AuthorDate: Fri Feb 23 15:44:26 2018 +0800
SCB-224 retry synchronously
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../org/apache/servicecomb/saga/PackStepdefs.java | 16 +--
.../servicecomb/saga/alpha/core/Command.java | 2 +-
.../saga/alpha/core/CommandRepository.java | 2 +-
.../servicecomb/saga/alpha/core/EventScanner.java | 22 ++--
.../saga/alpha/core/PushBackOmegaCallback.java | 9 +-
.../saga/alpha/core/TxConsistentService.java | 9 +-
.../servicecomb/saga/alpha/core/TxEvent.java | 53 +++++----
.../saga/alpha/core/TxEventRepository.java | 2 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 4 +-
.../saga/alpha/server/CommandEntityRepository.java | 15 ++-
.../saga/alpha/server/GrpcOmegaCallback.java | 2 +-
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 +-
.../saga/alpha/server/SpringCommandRepository.java | 23 +---
.../saga/alpha/server/SpringTxEventRepository.java | 15 +--
.../alpha/server/SpringTxTimeoutRepository.java | 5 +-
.../alpha/server/TxEventEnvelopeRepository.java | 49 +++++---
.../src/main/resources/schema-mysql.sql | 5 +-
.../src/main/resources/schema-postgresql.sql | 4 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 129 ++++++++++-----------
alpha/alpha-server/src/test/resources/schema.sql | 4 +-
.../pack/tests/CommandEnvelopeRepository.java | 13 +--
.../integration/pack/tests/GreetingController.java | 8 ++
.../integration/pack/tests/GreetingService.java | 22 ++++
.../saga/integration/pack/tests/PackIT.java | 92 +++++++++++++--
.../connector/grpc/GrpcClientMessageSender.java | 4 +-
.../grpc/GrpcCompensateStreamObserver.java | 6 +-
.../grpc/LoadBalancedClusterMessageSender.java | 8 +-
.../connector/grpc/PushBackReconnectRunnable.java | 8 +-
.../grpc/LoadBalancedClusterMessageSenderTest.java | 15 +--
.../connector/grpc/RetryableMessageSenderTest.java | 1 +
.../saga/omega/context/CompensationContext.java | 5 +-
.../spring/CompensableAnnotationProcessor.java | 1 +
.../spring/CompensableMethodCheckingCallback.java | 10 +-
.../spring/TransactionAspectConfig.java | 5 +-
.../spring/TransactionInterceptionTest.java | 103 ++++++++++++----
.../spring/TransactionalUserService.java | 24 +++-
.../saga/omega/transaction/spring/User.java | 2 +-
.../omega/transaction/CompensableInterceptor.java | 12 +-
.../transaction/CompensationMessageHandler.java | 15 +--
...TransactionAspect.java => DefaultRecovery.java} | 59 +++++-----
.../omega/transaction/EventAwareInterceptor.java | 6 +-
.../saga/omega/transaction/ForwardRecovery.java | 76 ++++++++++++
...TxCompensatedEvent.java => RecoveryPolicy.java} | 13 ++-
...aEndedEvent.java => RecoveryPolicyFactory.java} | 16 ++-
.../saga/omega/transaction/SagaEndedEvent.java | 5 +-
.../transaction/SagaStartAnnotationProcessor.java | 5 +-
.../saga/omega/transaction/SagaStartAspect.java | 2 +-
.../saga/omega/transaction/SagaStartedEvent.java | 2 +-
.../omega/transaction/TimeAwareInterceptor.java | 0
.../saga/omega/transaction/TransactionAspect.java | 36 +-----
.../saga/omega/transaction/TxAbortedEvent.java | 3 +-
.../saga/omega/transaction/TxCompensatedEvent.java | 2 +-
.../saga/omega/transaction/TxEndedEvent.java | 2 +-
.../saga/omega/transaction/TxEvent.java | 19 +--
.../saga/omega/transaction/TxStartedEvent.java | 7 +-
.../omega/transaction/annotations/Compensable.java | 4 +-
.../transaction/CompensableInterceptorTest.java | 8 +-
.../CompensationMessageHandlerTest.java | 16 ++-
...ionAspectTest.java => DefaultRecoveryTest.java} | 115 +++++++++++-------
...ionAspectTest.java => ForwardRecoveryTest.java} | 122 ++++++++++---------
.../transaction/TimeAwareInterceptorTest.java | 0
.../omega/transaction/TransactionAspectTest.java | 92 ++++++++++++---
.../src/main/proto/GrpcTxEvent.proto | 13 ++-
63 files changed, 868 insertions(+), 481 deletions(-)
diff --git a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
index 4f2f729..f9af438 100644
--- a/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
+++ b/acceptance-tests/acceptance-pack/src/test/java/org/apache/servicecomb/saga/PackStepdefs.java
@@ -41,7 +41,7 @@ import cucumber.api.java.After;
import cucumber.api.java8.En;
public class PackStepdefs implements En {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String ALPHA_REST_ADDRESS = "alpha.rest.address";
private static final String CAR_SERVICE_ADDRESS = "car.service.address";
@@ -73,7 +73,7 @@ public class PackStepdefs implements En {
});
Given("^Install the byteman script ([A-Za-z0-9_\\.]+) to ([A-Za-z]+) Service$", (String script, String service) -> {
- log.info("Install the byteman script {} to {} service", script, service);
+ LOG.info("Install the byteman script {} to {} service", script, service);
List<String> rules = new ArrayList<>();
rules.add("target/test-classes/" + script);
Submit bm = getBytemanSubmit(service);
@@ -81,7 +81,7 @@ public class PackStepdefs implements En {
});
When("^User ([A-Za-z]+) requests to book ([0-9]+) cars and ([0-9]+) rooms (success|fail)$", (username, cars, rooms, result) -> {
- log.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
+ LOG.info("Received request from user {} to book {} cars and {} rooms", username, cars, rooms);
Response resp = given()
.pathParam("name", username)
@@ -116,7 +116,7 @@ public class PackStepdefs implements En {
@After
public void cleanUp() {
- log.info("Cleaning up services");
+ LOG.info("Cleaning up services");
for (String address : addresses) {
given()
.when()
@@ -135,7 +135,7 @@ public class PackStepdefs implements En {
try {
bm.deleteAllRules();
} catch (Exception e) {
- log.warn("Fail to delete the byteman rules " + e);
+ LOG.warn("Fail to delete the byteman rules " + e);
}
}
}
@@ -156,7 +156,7 @@ public class PackStepdefs implements En {
return;
}
- log.info("Retrieved data {} from service", actualMaps);
+ LOG.info("Retrieved data {} from service", actualMaps);
dataTable.diff(DataTable.create(actualMaps));
}
@@ -180,12 +180,12 @@ public class PackStepdefs implements En {
if (isEmpty(infoURI)) {
infoURI = "/info";
}
- log.info("The info service uri is " + infoURI);
+ LOG.info("The info service uri is " + infoURI);
probe(address, infoURI);
}
private void probe(String address, String infoURI) {
- log.info("Connecting to service address {}", address);
+ LOG.info("Connecting to service address {}", address);
given()
.when()
.get(address + infoURI)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 4edb928..6c8f370 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -76,7 +76,7 @@ public class Command {
this.lastModified = new Date();
}
- private Command(long id,
+ public Command(long id,
String serviceName,
String instanceId,
String globalTxId,
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 25288ed..2bbea77 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -21,7 +21,7 @@ import java.util.List;
public interface CommandRepository {
- void saveCompensationCommands(TxEvent abortEvent);
+ void saveCompensationCommands(String globalTxId);
void markCommandAsDone(String globalTxId, String localTxId);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index d3fba31..3168870 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventScanner implements Runnable {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final ScheduledExecutorService scheduler;
@@ -83,7 +83,7 @@ public class EventScanner implements Runnable {
private void findTimeoutEvents() {
eventRepository.findTimeoutEvents()
.forEach(event -> {
- log.info("Found timeout event {}", event);
+ LOG.info("Found timeout event {}", event);
timeoutRepository.save(txTimeoutOf(event));
});
}
@@ -93,9 +93,9 @@ public class EventScanner implements Runnable {
}
private void saveUncompensatedEventsToCommands() {
- eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId)
- .ifPresent(event -> {
- log.info("Found uncompensated event {}", event);
+ eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+ .forEach(event -> {
+ LOG.info("Found uncompensated event {}", event);
nextEndedEventId = event.id();
commandRepository.saveCompensationCommands(event.globalTxId());
});
@@ -104,7 +104,7 @@ public class EventScanner implements Runnable {
private void updateCompensatedCommands() {
eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
.ifPresent(event -> {
- log.info("Found compensated event {}", event);
+ LOG.info("Found compensated event {}", event);
nextCompensatedEventId = event.id();
updateCompensationStatus(event);
});
@@ -114,13 +114,13 @@ public class EventScanner implements Runnable {
try {
eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
} catch (Exception e) {
- log.warn("Failed to delete duplicate event", e);
+ LOG.warn("Failed to delete duplicate event", e);
}
}
private void updateCompensationStatus(TxEvent event) {
commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
- log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+ LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
event.globalTxId(),
event.localTxId());
@@ -129,7 +129,7 @@ public class EventScanner implements Runnable {
private void abortTimeoutEvents() {
timeoutRepository.findFirstTimeout().forEach(timeout -> {
- log.info("Found timeout event {} to abort", timeout);
+ LOG.info("Found timeout event {} to abort", timeout);
eventRepository.save(toTxAbortedEvent(timeout));
@@ -152,7 +152,7 @@ public class EventScanner implements Runnable {
private void markGlobalTxEnd(TxEvent event) {
eventRepository.save(toSagaEndedEvent(event));
- log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
+ LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
private TxEvent toTxAbortedEvent(TxTimeout timeout) {
@@ -182,7 +182,7 @@ public class EventScanner implements Runnable {
private void compensate() {
commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
- log.info("Compensating transaction with globalTxId {} and localTxId {}",
+ LOG.info("Compensating transaction with globalTxId {} and localTxId {}",
command.globalTxId(),
command.localTxId());
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
index 3b27c14..9556d7c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/PushBackOmegaCallback.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushBackOmegaCallback implements OmegaCallback {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final BlockingQueue<Runnable> pendingCompensations;
private final OmegaCallback underlying;
@@ -45,11 +45,12 @@ public class PushBackOmegaCallback implements OmegaCallback {
}
private void logError(TxEvent event, Exception e) {
- log.error(
- "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+ LOG.error(
+ "Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+ event.retries() == 0 ? "compensate" : "retry",
event.serviceName(),
event.instanceId(),
- event.compensationMethod(),
+ event.retries() == 0 ? event.compensationMethod() : event.retryMethod(),
event.globalTxId(),
event.localTxId(),
e);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 968e5b7..4382fd3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TxConsistentService {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final TxEventRepository eventRepository;
@@ -41,8 +41,9 @@ public class TxConsistentService {
}
public boolean handle(TxEvent event) {
- if (types.contains(event.type()) && isGlobalTxAborted(event)) {
- log.info("Transaction event {} rejected, because its parent with globalTxId {} was already aborted", event.type(), event.globalTxId());
+ if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+ LOG.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted",
+ event.globalTxId());
return false;
}
@@ -51,7 +52,7 @@ public class TxConsistentService {
if (eventRepository.findTimeoutEvents().stream()
.filter(txEvent -> txEvent.globalTxId().equals(event.globalTxId()))
.count() == 1) {
- log.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
+ LOG.warn("Transaction {} is timeout and will be handled by the event scanner", event.globalTxId());
return false;
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1abb7fe..17b059c 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,12 +17,16 @@
package org.apache.servicecomb.saga.alpha.core;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.Date;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Transient;
@Entity
@Table(name = "TxEvent")
@@ -43,9 +47,9 @@ public class TxEvent {
private String type;
private String compensationMethod;
private Date expiryTime;
- private byte[] payloads;
+ private String retryMethod;
private int retries;
- private String retriesMethod;
+ private byte[] payloads;
private TxEvent() {
}
@@ -61,6 +65,8 @@ public class TxEvent {
event.type,
event.compensationMethod,
event.expiryTime,
+ event.retryMethod,
+ event.retries,
event.payloads);
}
@@ -73,7 +79,8 @@ public class TxEvent {
String type,
String compensationMethod,
byte[] payloads) {
- this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, payloads);
+ this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, "", 0,
+ payloads);
}
public TxEvent(
@@ -85,9 +92,11 @@ public class TxEvent {
String type,
String compensationMethod,
int timeout,
+ String retryMethod,
+ int retries,
byte[] payloads) {
this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout,
- "", 0, payloads);
+ retryMethod, retries, payloads);
}
public TxEvent(
@@ -100,9 +109,11 @@ public class TxEvent {
String type,
String compensationMethod,
int timeout,
+ String retryMethod,
+ int retries,
byte[] payloads) {
this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod,
- timeout, "", 0, payloads);
+ timeout, retryMethod, retries, payloads);
}
TxEvent(Long surrogateId,
@@ -115,10 +126,14 @@ public class TxEvent {
String type,
String compensationMethod,
int timeout,
+ String retryMethod,
+ int retries,
byte[] payloads) {
this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
compensationMethod,
timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+ retryMethod,
+ retries,
payloads);
}
@@ -132,7 +147,7 @@ public class TxEvent {
String type,
String compensationMethod,
Date expiryTime,
- String retriesMethod,
+ String retryMethod,
int retries,
byte[] payloads) {
this.surrogateId = surrogateId;
@@ -145,7 +160,7 @@ public class TxEvent {
this.type = type;
this.compensationMethod = compensationMethod;
this.expiryTime = expiryTime;
- this.retriesMethod = retriesMethod;
+ this.retryMethod = retryMethod;
this.retries = retries;
this.payloads = payloads;
}
@@ -194,6 +209,14 @@ public class TxEvent {
return expiryTime;
}
+ public String retryMethod() {
+ return retryMethod;
+ }
+
+ public int retries() {
+ return retries;
+ }
+
@Override
public String toString() {
return "TxEvent{" +
@@ -206,19 +229,9 @@ public class TxEvent {
", parentTxId='" + parentTxId + '\'' +
", type='" + type + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
- ", expiryTime='" + expiryTime + '\'' +
+ ", expiryTime=" + expiryTime +
+ ", retryMethod='" + retryMethod + '\'' +
+ ", retries=" + retries +
'}';
}
-
- public int retries() {
- return retries;
- }
-
- public String retriesMethod() {
- return retriesMethod;
- }
-
- public boolean containChildren(TxEvent event) {
- return this.localTxId.equals(event.parentTxId);
- }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index 9eceadd..c481226 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -97,7 +97,7 @@ public interface TxEventRepository {
* @param id
* @return
*/
- Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id);
+ List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
/**
* Find a {@link TxEvent} which satisfies below requirements:
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index da36066..e80d5a8 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -73,8 +73,8 @@ public class TxConsistentServiceTest {
}
@Override
- public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
- return Optional.empty();
+ public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+ return emptyList();
}
@Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 737fd11..53110bf 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -34,9 +34,22 @@ public interface CommandEntityRepository extends CrudRepository<Command, Long> {
@Transactional
@Modifying(clearAutomatically = true)
@Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+ + "SET c.status = :toStatus "
+ + "WHERE c.globalTxId = :globalTxId "
+ + " AND c.localTxId = :localTxId "
+ + " AND c.status = :fromStatus")
+ void updateStatusByGlobalTxIdAndLocalTxId(
+ @Param("fromStatus") String fromStatus,
+ @Param("toStatus") String toStatus,
+ @Param("globalTxId") String globalTxId,
+ @Param("localTxId") String localTxId);
+
+ @Transactional
+ @Modifying(clearAutomatically = true)
+ @Query("UPDATE org.apache.servicecomb.saga.alpha.core.Command c "
+ "SET c.status = :status "
+ "WHERE c.globalTxId = :globalTxId "
- + "AND c.localTxId = :localTxId")
+ + " AND c.localTxId = :localTxId")
void updateStatusByGlobalTxIdAndLocalTxId(
@Param("status") String status,
@Param("globalTxId") String globalTxId,
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 5a95281..a54fa66 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -42,7 +42,7 @@ class GrpcOmegaCallback implements OmegaCallback {
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
.setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
- .setCompensateMethod(event.compensationMethod())
+ .setCompensationMethod(event.compensationMethod())
.setPayloads(ByteString.copyFrom(event.payloads()))
.build();
observer.onNext(command);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 99457a2..a3137b4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -85,7 +85,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
message.getType(),
message.getCompensationMethod(),
message.getTimeout(),
- message.getRetriesMethod(),
+ message.getRetryMethod(),
message.getRetries(),
message.getPayloads().toByteArray()
));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index ed7b4f1..4aa30d1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SpringCommandRepository implements CommandRepository {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final TxEventEnvelopeRepository eventRepository;
private final CommandEntityRepository commandRepository;
@@ -57,13 +57,13 @@ public class SpringCommandRepository implements CommandRepository {
}
for (Command command : commands.values()) {
- log.info("Saving compensation command {}", command);
+ LOG.info("Saving compensation command {}", command);
try {
commandRepository.save(command);
} catch (Exception e) {
- log.warn("Failed to save some command {}", command);
+ LOG.warn("Failed to save some command {}", command);
}
- log.info("Saved compensation command {}", command);
+ LOG.info("Saved compensation command {}", command);
}
}
@@ -85,24 +85,11 @@ public class SpringCommandRepository implements CommandRepository {
commands.forEach(command ->
commandRepository.updateStatusByGlobalTxIdAndLocalTxId(
+ NEW.name(),
PENDING.name(),
command.globalTxId(),
command.localTxId()));
return commands;
}
-
- private long retriedTimes(String globalTxId, String retriesMethod, String localTxId) {
- return commandRepository.findByGlobalTxIdAndStatus(globalTxId, DONE.name()).stream()
- .filter(c -> Objects.equals(c.compensationMethod(), retriesMethod)
- && Objects.equals(c.localTxId(), localTxId)).count();
- }
-
- private List<TxEvent> createRetriesTxEvent(long abortEventId, TxEvent txEvent) {
- return Collections.singletonList(new TxEvent(
- abortEventId, txEvent.serviceName(), txEvent.instanceId(), txEvent.creationTime(),
- txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(),
- txEvent.type(), txEvent.retriesMethod(), txEvent.payloads()
- ));
- }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index cae6456..e48a780 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,17 +17,14 @@
package org.apache.servicecomb.saga.alpha.server;
+import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
+
import java.util.List;
import java.util.Optional;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
import org.springframework.data.domain.PageRequest;
-import org.springframework.util.CollectionUtils;
-
-import javax.swing.text.html.Option;
-
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
class SpringTxEventRepository implements TxEventRepository {
private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1);
@@ -63,12 +60,8 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public Optional<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id) {
- List<TxEvent> result = eventRepo.findFirstUncompensatedEventByIdGreaterThan(id, SINGLE_TX_EVENT_REQUEST);
- if (CollectionUtils.isEmpty(result)) {
- return Optional.empty();
- }
- return Optional.of(result.get(0));
+ public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+ return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST);
}
@Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 7195139..53b4443 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.data.domain.PageRequest;
public class SpringTxTimeoutRepository implements TxTimeoutRepository {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private final TxTimeoutEntityRepository timeoutRepo;
SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
@@ -43,7 +44,7 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository {
try {
timeoutRepo.save(timeout);
} catch (Exception ignored) {
- log.warn("Failed to save some timeout {}", timeout);
+ LOG.warn("Failed to save some timeout {}", timeout);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3a7edb3..68e6233 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -35,7 +35,17 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
+ " SELECT t1.globalTxId FROM TxEvent t1"
+ " WHERE t1.globalTxId = t.globalTxId "
- + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+ + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) AND NOT EXISTS ( "
+ + " SELECT t3.globalTxId FROM TxEvent t3 "
+ + " WHERE t3.globalTxId = t.globalTxId "
+ + " AND t3.localTxId = t.localTxId "
+ + " AND t3.surrogateId != t.surrogateId "
+ + " AND t3.creationTime > t.creationTime) AND (("
+ + "SELECT MIN(t2.retries) FROM TxEvent t2 "
+ + "WHERE t2.globalTxId = t.globalTxId "
+ + " AND t2.localTxId = t.localTxId "
+ + " AND t2.type = 'TxStartedEvent') = 0 "
+ + "OR t.globalTxId = t.localTxId)")
Optional<TxEvent> findFirstAbortedGlobalTxByType();
@Query("SELECT t FROM TxEvent t "
@@ -56,9 +66,13 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
@Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+ "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
- + "t.type, t.compensationMethod, t.payloads"
+ + "t.type, t.compensationMethod, t.payloads "
+ ") FROM TxEvent t "
- + "WHERE t.globalTxId = ?1 AND t.type = ?2")
+ + "WHERE t.globalTxId = ?1 AND t.type = ?2 "
+ + " AND ( SELECT MIN(t1.retries) FROM TxEvent t1 "
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.localTxId = t.localTxId "
+ + " AND t1.type = 'TxStartedEvent' ) = 0 ")
List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
@Query("SELECT t FROM TxEvent t "
@@ -73,25 +87,30 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " FROM TxEvent t2 "
+ " WHERE t2.globalTxId = ?1 "
+ " AND t2.localTxId = t.localTxId "
- + " AND t2.compensationMethod != t.retriesMethod "
+ " AND t2.type = 'TxCompensatedEvent') "
+ "ORDER BY t.surrogateId ASC")
List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
@Query("SELECT t FROM TxEvent t "
- + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?1 AND EXISTS ( "
- + " SELECT t1.globalTxId"
- + " FROM TxEvent t1 "
+ + "WHERE t.type = ?1 AND t.surrogateId > ?2 AND EXISTS ( "
+ + " SELECT t1.globalTxId FROM TxEvent t1 "
+ " WHERE t1.globalTxId = t.globalTxId "
- + " AND t1.type = 'TxAbortedEvent'"
- + ") AND NOT EXISTS ( "
- + " SELECT t2.globalTxId"
- + " FROM TxEvent t2 "
- + " WHERE t2.globalTxId = t.globalTxId "
- + " AND t2.localTxId = t.localTxId "
- + " AND t2.type = 'TxCompensatedEvent') "
+ + " AND t1.type = 'TxAbortedEvent' AND NOT EXISTS ( "
+ + " SELECT t2.globalTxId FROM TxEvent t2 "
+ + " WHERE t2.globalTxId = t1.globalTxId "
+ + " AND t2.localTxId = t1.localTxId "
+ + " AND t2.type = 'TxStartedEvent' "
+ + " AND t2.creationTime > t1.creationTime)) AND NOT EXISTS ( "
+ + " SELECT t3.globalTxId FROM TxEvent t3 "
+ + " WHERE t3.globalTxId = t.globalTxId "
+ + " AND t3.localTxId = t.localTxId "
+ + " AND t3.type = 'TxCompensatedEvent') AND ( "
+ + " SELECT MIN(t4.retries) FROM TxEvent t4 "
+ + " WHERE t4.globalTxId = t.globalTxId "
+ + " AND t4.localTxId = t.localTxId "
+ + " AND t4.type = 'TxStartedEvent' ) = 0 "
+ "ORDER BY t.surrogateId ASC")
- List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long surrogateId, Pageable pageable);
+ List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql
index c21e518..3806f6d 100644
--- a/alpha/alpha-server/src/main/resources/schema-mysql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql
@@ -26,8 +26,6 @@ CREATE TABLE IF NOT EXISTS TxEvent (
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
- retries_method varchar(256) NOT NULL,
- retries int NOT NULL,
payloads varbinary(10240),
PRIMARY KEY (surrogateId),
INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime)
@@ -43,6 +41,8 @@ CREATE TABLE IF NOT EXISTS Command (
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
compensationMethod varchar(256) NOT NULL,
+ retryMethod varchar(256) NOT NULL,
+ retries int NOT NULL DEFAULT 0,
payloads varbinary(10240),
status varchar(12),
lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS Command (
INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, status)
) DEFAULT CHARSET=utf8;
+
CREATE TABLE IF NOT EXISTS TxTimeout (
surrogateId bigint NOT NULL AUTO_INCREMENT,
eventId bigint NOT NULL UNIQUE,
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 674e051..4ecb1b4 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
expiryTime timestamp(6) NOT NULL,
- retriesMethod varchar(256) NOT NULL,
- retries int NOT NULL,
+ retryMethod varchar(256) NOT NULL,
+ retries int NOT NULL DEFAULT 0,
payloads bytea
);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 26aa17a..10bb2dd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -92,6 +92,8 @@ public class AlphaIntegrationTest {
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
private final String compensationMethod = getClass().getCanonicalName();
+
+ private final String retryMethod = uniquify("retryMethod");
private final String serviceName = uniquify("serviceName");
private final String instanceId = uniquify("instanceId");
@@ -128,7 +130,9 @@ public class AlphaIntegrationTest {
private TxConsistentService consistentService;
private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
- private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
+
+ private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver(
+ this::onCompensation);
@AfterClass
public static void tearDown() throws Exception {
@@ -205,7 +209,7 @@ public class AlphaIntegrationTest {
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
- CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
+ CompensationStreamObserver anotherResponseObserver = new CompensationStreamObserver();
TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
@@ -247,7 +251,7 @@ public class AlphaIntegrationTest {
assertThat(command.getGlobalTxId(), is(globalTxId));
assertThat(command.getLocalTxId(), is(localTxId));
assertThat(command.getParentTxId(), is(parentTxId));
- assertThat(command.getCompensateMethod(), is(compensationMethod));
+ assertThat(command.getCompensationMethod(), is(compensationMethod));
assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
}
@@ -269,9 +273,9 @@ public class AlphaIntegrationTest {
assertThat(receivedCommands, contains(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
- .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+ .setCompensationMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
- .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
+ .setCompensationMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
));
}
@@ -289,7 +293,7 @@ public class AlphaIntegrationTest {
assertThat(command.getGlobalTxId(), is(globalTxId));
assertThat(command.getLocalTxId(), is(localTxId));
assertThat(command.getParentTxId(), is(parentTxId));
- assertThat(command.getCompensateMethod(), is(compensationMethod));
+ assertThat(command.getCompensationMethod(), is(compensationMethod));
assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
}
@@ -301,7 +305,7 @@ public class AlphaIntegrationTest {
// simulates connection from another service with different globalTxId
GrpcServiceConfig anotherServiceConfig = someServiceConfig();
- TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensateStreamObserver());
+ TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensationStreamObserver());
TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
@@ -403,7 +407,7 @@ public class AlphaIntegrationTest {
@Test
public void abortTimeoutTxStartedEvent() {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
- blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId));
+ blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId, null));
blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
await().atMost(2, SECONDS).until(() -> {
@@ -429,6 +433,26 @@ public class AlphaIntegrationTest {
});
}
+ @Test
+ public void doNotCompensateRetryingEvents() throws InterruptedException {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 1));
+ blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
+ blockingStub.onTxEvent(someGrpcEventWithRetry(TxStartedEvent, retryMethod, 0));
+ blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent));
+
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+ List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+ assertThat(events.size(), is(4));
+ assertThat(events.get(0).type(), is(TxStartedEvent.name()));
+ assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+ assertThat(events.get(2).type(), is(TxStartedEvent.name()));
+ assertThat(events.get(3).type(), is(TxEndedEvent.name()));
+
+ assertThat(receivedCommands.isEmpty(), is(true));
+ }
+
private boolean waitTillTimeoutDone() {
for (TxTimeout txTimeout : timeoutEntityRepository.findAll()) {
if (txTimeout.status().equals(DONE.name())) {
@@ -438,62 +462,13 @@ public class AlphaIntegrationTest {
return false;
}
- @Test
- public void retiesAndCompensateOnFailure() throws Exception {
- asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-
- String localTxId1 = UUID.randomUUID().toString();
-
- blockingStub.onTxEvent(GrpcTxEvent.newBuilder()
- .setServiceName(serviceName)
- .setInstanceId(instanceId)
- .setTimestamp(System.currentTimeMillis())
- .setGlobalTxId(globalTxId)
- .setLocalTxId(localTxId1)
- .setParentTxId(parentTxId)
- .setType(TxStartedEvent.name())
- .setCompensationMethod("Compensation Method")
- .setPayloads(ByteString.copyFrom(payload.getBytes()))
- .setRetries(3).setRetriesMethod("Retries Method")
- .build());
- blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, localTxId1));
-
- await().atMost(3, SECONDS).until(() -> !eventRepo.findByGlobalTxId(globalTxId).isEmpty());
-
- for (int i = 0; i < 3; i++) {
- blockingStub.onTxEvent(
- eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
- await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
- GrpcCompensateCommand command = receivedCommands.poll();
- assertThat(command.getGlobalTxId(), is(globalTxId));
- assertThat(command.getLocalTxId(), is(localTxId1));
- assertThat(command.getParentTxId(), is(parentTxId));
- assertThat(command.getCompensateMethod(), is("Retries Method"));
- assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
- }
-
- blockingStub.onTxEvent(
- eventOf(TxAbortedEvent, localTxId, localTxId1, payload.getBytes(), getClass().getCanonicalName()));
-
- await().atMost(3, SECONDS).until(() -> receivedCommands.size() == 1);
-
- GrpcCompensateCommand command = receivedCommands.poll();
- assertThat(command.getGlobalTxId(), is(globalTxId));
- assertThat(command.getLocalTxId(), is(localTxId1));
- assertThat(command.getParentTxId(), is(parentTxId));
- assertThat(command.getCompensateMethod(), is("Compensation Method"));
- assertThat(command.getPayloads().toByteArray(), is(payload.getBytes()));
- }
-
private GrpcAck onCompensation(GrpcCompensateCommand command) {
return blockingStub.onTxEvent(
eventOf(TxCompensatedEvent,
command.getLocalTxId(),
command.getParentTxId(),
- new byte[0],
- command.getCompensateMethod()));
+ command.getPayloads().toByteArray(),
+ command.getCompensationMethod()));
}
private GrpcServiceConfig someServiceConfig() {
@@ -516,23 +491,35 @@ public class AlphaIntegrationTest {
}
private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
- return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout);
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout,
+ "", 0);
+ }
+
+ private GrpcTxEvent someGrpcEventWithRetry(EventType type, String retryMethod, int retries) {
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), compensationMethod, 0,
+ retryMethod, retries);
}
private GrpcTxEvent someGrpcEvent(EventType type) {
- return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+ return someGrpcEvent(type, localTxId);
}
- private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) {
+ private GrpcTxEvent someGrpcEvent(EventType type, String localTxId) {
return someGrpcEvent(type, globalTxId, localTxId);
}
private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
- return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0);
+ return someGrpcEvent(type, globalTxId, localTxId, parentTxId);
+ }
+
+ private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId, String parentTxId) {
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0, "",
+ 0);
}
- private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
- return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0);
+ private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads,
+ String compensationMethod) {
+ return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0, "", 0);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -541,7 +528,9 @@ public class AlphaIntegrationTest {
String parentTxId,
byte[] payloads,
String compensationMethod,
- int timeout) {
+ int timeout,
+ String retryMethod,
+ int retries) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -553,19 +542,21 @@ public class AlphaIntegrationTest {
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
.setTimeout(timeout)
+ .setRetryMethod(retryMethod)
+ .setRetries(retries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
- private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+ private static class CompensationStreamObserver implements StreamObserver<GrpcCompensateCommand> {
private final Consumer<GrpcCompensateCommand> consumer;
private boolean completed = false;
- private CompensateStreamObserver() {
+ private CompensationStreamObserver() {
this(command -> {});
}
- private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+ private CompensationStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
this.consumer = consumer;
}
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index ca46625..8d70899 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
expiryTime TIMESTAMP NOT NULL,
- retriesMethod varchar(256) NOT NULL,
- retries int NOT NULL,
+ retryMethod varchar(256) NOT NULL,
+ retries int DEFAULT 0 NOT NULL,
payloads varbinary(10240)
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
similarity index 68%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
index 8c70e3a..ad8ae3a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/CommandEnvelopeRepository.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,10 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.omega.transaction;
+package org.apache.servicecomb.saga.integration.pack.tests;
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.alpha.core.Command;
+import org.springframework.data.repository.CrudRepository;
-public class SagaEndedEvent extends TxEvent {
-
- public SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
- }
+interface CommandEnvelopeRepository extends CrudRepository<Command, Long> {
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 2bdd587..e497cec 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -77,4 +77,12 @@ public class GreetingController {
ResponseEntity<String> goodNight(@RequestParam String name) {
return ResponseEntity.ok("Good night, " + name);
}
+
+ @SagaStart
+ @GetMapping("/open")
+ ResponseEntity<String> open(@RequestParam String name, @RequestParam int retries) {
+ String greetings = greetingService.greet(name);
+ String status = greetingService.open(name, retries);
+ return ResponseEntity.ok(greetings + "; " + status);
+ }
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index 69a86f6..554dc15 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -27,6 +27,9 @@ import org.springframework.stereotype.Service;
class GreetingService {
private final Queue<String> compensated;
+ private final int MAX_COUNT = 3;
+ private int failedCount = 1;
+
@Autowired
GreetingService(Queue<String> compensated) {
this.compensated = compensated;
@@ -59,8 +62,27 @@ class GreetingService {
return appendMessage("My bad, please take the window instead, " + name);
}
+ @Compensable(retries = MAX_COUNT, compensationMethod = "close")
+ String open(String name, int retries) {
+ if (failedCount < retries) {
+ failedCount += 1;
+ throw new IllegalStateException("You know when the zoo opens, " + name);
+ }
+ resetCount();
+ return "Welcome to visit the zoo, " + name;
+ }
+
+ String close(String name, int retries) {
+ resetCount();
+ return appendMessage("Sorry, the zoo has already closed, " + name);
+ }
+
private String appendMessage(String message) {
compensated.add(message);
return message;
}
+
+ void resetCount() {
+ this.failedCount = 1;
+ }
}
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index b3045e3..9199ff6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -56,14 +56,23 @@ public class PackIT {
private OmegaContext omegaContext;
@Autowired
- private TxEventEnvelopeRepository repository;
+ private TxEventEnvelopeRepository eventRepo;
+
+ @Autowired
+ private CommandEnvelopeRepository commandRepo;
@Autowired
private Queue<String> compensatedMessages;
+ @Autowired
+ private GreetingService greetingService;
+
@After
public void tearDown() throws Exception {
- repository.deleteAll();
+ eventRepo.deleteAll();
+ commandRepo.deleteAll();
+ compensatedMessages.clear();
+ greetingService.resetCount();
}
@Test(timeout = 5000)
@@ -75,11 +84,11 @@ public class PackIT {
assertThat(entity.getStatusCode(), is(OK));
assertThat(entity.getBody(), is("Greetings, mike; Bonjour, mike"));
- List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+ List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), is(1));
String globalTxId = distinctGlobalTxIds.get(0);
- List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(6));
@@ -136,13 +145,13 @@ public class PackIT {
assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
- await().atMost(2, SECONDS).until(() -> repository.count() == 7);
+ await().atMost(2, SECONDS).until(() -> eventRepo.count() == 7);
- List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+ List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), is(1));
String globalTxId = distinctGlobalTxIds.get(0);
- List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(7));
TxEvent sagaStartedEvent = events.get(0);
@@ -184,11 +193,11 @@ public class PackIT {
assertThat(entity.getStatusCode(), is(OK));
assertThat(entity.getBody(), is("Good morning, Bonjour, mike"));
- List<String> distinctGlobalTxIds = repository.findDistinctGlobalTxId();
+ List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), is(1));
String globalTxId = distinctGlobalTxIds.get(0);
- List<TxEvent> events = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(6));
@@ -220,4 +229,69 @@ public class PackIT {
assertThat(compensatedMessages.isEmpty(), is(true));
}
+
+ @Test(timeout = 5000)
+ public void retrySubTransactionSuccess() {
+ ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+ String.class,
+ "eric",
+ 2);
+
+ assertThat(entity.getStatusCode(), is(OK));
+ assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the zoo, eric"));
+
+ await().atMost(3, SECONDS).until(() -> eventRepo.count() == 8);
+
+ List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+ assertThat(distinctGlobalTxIds.size(), is(1));
+
+ String globalTxId = distinctGlobalTxIds.get(0);
+ List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ assertThat(events.size(), is(8));
+
+ assertThat(events.get(0).type(), is("SagaStartedEvent"));
+ assertThat(events.get(1).type(), is("TxStartedEvent"));
+ assertThat(events.get(2).type(), is("TxEndedEvent"));
+ assertThat(events.get(3).type(), is("TxStartedEvent"));
+ assertThat(events.get(4).type(), is("TxAbortedEvent"));
+ assertThat(events.get(5).type(), is("TxStartedEvent"));
+ assertThat(events.get(6).type(), is("TxEndedEvent"));
+ assertThat(events.get(7).type(), is("SagaEndedEvent"));
+
+ assertThat(compensatedMessages.isEmpty(), is(true));
+ }
+
+ @Test(timeout = 5000)
+ public void compensateWhenRetryReachesMaximum() throws InterruptedException {
+ // retries 3 times and then compensate
+ ResponseEntity<String> entity = restTemplate.getForEntity("/open?name={name}&retries={retries}",
+ String.class,
+ TRESPASSER,
+ 5);
+
+ assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+ await().atMost(3, SECONDS).until(() -> eventRepo.count() == 11);
+
+ List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
+ assertThat(distinctGlobalTxIds.size(), is(1));
+
+ String globalTxId = distinctGlobalTxIds.get(0);
+ List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
+ assertThat(events.size(), is(11));
+
+ assertThat(events.get(0).type(), is("SagaStartedEvent"));
+ assertThat(events.get(1).type(), is("TxStartedEvent"));
+ assertThat(events.get(2).type(), is("TxEndedEvent"));
+ assertThat(events.get(3).type(), is("TxStartedEvent"));
+ assertThat(events.get(4).type(), is("TxAbortedEvent"));
+ assertThat(events.get(5).type(), is("TxStartedEvent"));
+ assertThat(events.get(6).type(), is("TxAbortedEvent"));
+ assertThat(events.get(7).type(), is("TxStartedEvent"));
+ assertThat(events.get(8).type(), is("TxAbortedEvent"));
+ assertThat(events.get(9).type(), is("TxCompensatedEvent"));
+ assertThat(events.get(10).type(), is("SagaEndedEvent"));
+
+ assertThat(compensatedMessages, contains("Goodbye, " + TRESPASSER));
+ }
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index cf53a0c..b33eeb3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,13 +42,13 @@ import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
public class GrpcClientMessageSender implements MessageSender {
-
private final String target;
private final TxEventServiceStub asyncEventService;
private final MessageSerializer serializer;
private final TxEventServiceBlockingStub blockingEventService;
+
private final GrpcCompensateStreamObserver compensateStreamObserver;
private final GrpcServiceConfig serviceConfig;
@@ -104,8 +104,8 @@ public class GrpcClientMessageSender implements MessageSender {
.setType(event.type().name())
.setTimeout(event.timeout())
.setCompensationMethod(event.compensationMethod())
+ .setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
.setRetries(event.retries())
- .setRetriesMethod(event.retriesMethod() == null ? "" : event.retriesMethod())
.setPayloads(payloads);
return builder.build();
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 3cf46f8..9d9c312 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -46,14 +46,14 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
@Override
public void onNext(GrpcCompensateCommand command) {
- LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
- command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+ LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
+ command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
messageHandler.onReceive(
command.getGlobalTxId(),
command.getLocalTxId(),
command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
- command.getCompensateMethod(),
+ command.getCompensationMethod(),
deserializer.deserialize(command.getPayloads().toByteArray()));
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 9a78a62..afff8e7 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -48,7 +48,7 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
public class LoadBalancedClusterMessageSender implements MessageSender {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
private final Collection<ManagedChannel> channels;
@@ -104,7 +104,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
try {
sender.onConnected();
} catch (Exception e) {
- log.error("Failed connecting to alpha at {}", sender.target(), e);
+ LOG.error("Failed connecting to alpha at {}", sender.target(), e);
}
});
}
@@ -115,7 +115,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
try {
sender.onDisconnected();
} catch (Exception e) {
- log.error("Failed disconnecting from alpha at {}", sender.target(), e);
+ LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
}
});
}
@@ -140,7 +140,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
} catch (OmegaException e) {
throw e;
} catch (Exception e) {
- log.error("Retry sending event {} due to failure", event, e);
+ LOG.error("Retry sending event {} due to failure", event, e);
// very large latency on exception
senders.put(messageSender, Long.MAX_VALUE);
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
index f019d10..02571fd 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/PushBackReconnectRunnable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class PushBackReconnectRunnable implements Runnable {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MessageSender messageSender;
private final Map<MessageSender, Long> senders;
private final BlockingQueue<Runnable> pendingTasks;
@@ -47,14 +47,14 @@ class PushBackReconnectRunnable implements Runnable {
@Override
public void run() {
try {
- log.info("Retry connecting to alpha at {}", messageSender.target());
+ LOG.info("Retry connecting to alpha at {}", messageSender.target());
messageSender.onDisconnected();
messageSender.onConnected();
senders.put(messageSender, 0L);
connectedSenders.offer(messageSender);
- log.info("Retry connecting to alpha at {} is successful", messageSender.target());
+ LOG.info("Retry connecting to alpha at {} is successful", messageSender.target());
} catch (Exception e) {
- log.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
+ LOG.error("Failed to reconnect to alpha at {}", messageSender.target(), e);
pendingTasks.offer(this);
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index d66b737..e8b2f34 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -90,15 +90,15 @@ public class LoadBalancedClusterMessageSenderTest {
private final List<String> compensated = new ArrayList<>();
- private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) ->
- compensated.add(globalTxId);
+ private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod,
+ payloads) -> compensated.add(globalTxId);
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
private final String parentTxId = uniquify("parentTxId");
private final String compensationMethod = getClass().getCanonicalName();
private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
- compensationMethod, 0, null, 0, "blah");
+ compensationMethod, 0, "", 0, "blah");
private final String serviceName = uniquify("serviceName");
private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -300,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
public void forwardSendResult() {
assertThat(messageSender.send(event).aborted(), is(false));
- TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, null, 0, "blah");
+ TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "", 0, "blah");
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
@@ -335,6 +335,7 @@ public class LoadBalancedClusterMessageSenderTest {
private final Queue<String> connected;
private final Queue<TxEvent> events;
private final int delay;
+
private StreamObserver<GrpcCompensateCommand> responseObserver;
private MyTxEventService(Queue<String> connected, Queue<TxEvent> events, int delay) {
@@ -357,9 +358,9 @@ public class LoadBalancedClusterMessageSenderTest {
request.getLocalTxId(),
request.getParentTxId(),
request.getCompensationMethod(),
- 0,
- null,
- 0,
+ request.getTimeout(),
+ request.getRetryMethod(),
+ request.getRetries(),
new String(request.getPayloads().toByteArray())));
sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 9d0ebc4..3856415 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,6 +42,7 @@ public class RetryableMessageSenderTest {
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
+
private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0, null, 0);
@Test
diff --git a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
index 48a67f7..cf16888 100644
--- a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
+++ b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/CompensationContext.java
@@ -36,7 +36,7 @@ public class CompensationContext {
contexts.put(compensationMethod.toString(), new CompensationContextInternal(target, compensationMethod));
}
- public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
+ public void apply(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
CompensationContextInternal contextInternal = contexts.get(compensationMethod);
try {
@@ -44,7 +44,7 @@ public class CompensationContext {
LOG.info("Compensated transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error(
- "Pre-checking for compensate method " + contextInternal.compensationMethod.toString()
+ "Pre-checking for compensation method " + contextInternal.compensationMethod.toString()
+ " was somehow skipped, did you forget to configure compensable method checking on service startup?",
e);
}
@@ -52,6 +52,7 @@ public class CompensationContext {
private static final class CompensationContextInternal {
private final Object target;
+
private final Method compensationMethod;
private CompensationContextInternal(Object target, Method compensationMethod) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
index 338751c..7d7d45f 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableAnnotationProcessor.java
@@ -26,6 +26,7 @@ import org.springframework.util.ReflectionUtils;
class CompensableAnnotationProcessor implements BeanPostProcessor {
private final OmegaContext omegaContext;
+
private final CompensationContext compensationContext;
CompensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
index 268fad9..90d8b06 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/CompensableMethodCheckingCallback.java
@@ -31,6 +31,7 @@ class CompensableMethodCheckingCallback implements MethodCallback {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Object bean;
+
private final CompensationContext compensationContext;
CompensableMethodCheckingCallback(Object bean, CompensationContext compensationContext) {
@@ -47,10 +48,13 @@ class CompensableMethodCheckingCallback implements MethodCallback {
String compensationMethod = method.getAnnotation(Compensable.class).compensationMethod();
try {
- Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
compensationContext.addCompensationContext(method, bean);
- compensationContext.addCompensationContext(signature, bean);
- LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+
+ if (!compensationMethod.isEmpty()) {
+ Method signature = bean.getClass().getDeclaredMethod(compensationMethod, method.getParameterTypes());
+ compensationContext.addCompensationContext(signature, bean);
+ LOG.debug("Found compensation method [{}] in {}", compensationMethod, bean.getClass().getCanonicalName());
+ }
} catch (NoSuchMethodException e) {
throw new OmegaException(
"No such compensation method [" + compensationMethod + "] found in " + bean.getClass().getCanonicalName(),
diff --git a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 41be8f7..4fd4188 100644
--- a/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -35,7 +35,7 @@ public class TransactionAspectConfig {
@Bean
MessageHandler messageHandler(MessageSender sender, CompensationContext context, OmegaContext omegaContext) {
- return new CompensationMessageHandler(sender, omegaContext, context);
+ return new CompensationMessageHandler(sender, context);
}
@Order(0)
@@ -51,7 +51,8 @@ public class TransactionAspectConfig {
}
@Bean
- CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext, CompensationContext compensationContext) {
+ CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext,
+ CompensationContext compensationContext) {
return new CompensableAnnotationProcessor(omegaContext, compensationContext);
}
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 19d0942..6505bfc 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -24,10 +24,13 @@ import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -106,15 +109,18 @@ public class TransactionInterceptionTest {
private String compensationMethod;
- private String retriesMethod;
+ private String compensationMethod2;
+
+ private String retryMethod;
@Before
public void setUp() throws Exception {
when(idGenerator.nextId()).thenReturn(newLocalTxId, anotherLocalTxId);
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(globalTxId);
- retriesMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class).toString();
+ retryMethod = TransactionalUserService.class.getDeclaredMethod("add", User.class, int.class).toString();
compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
+ compensationMethod2 = TransactionalUserService.class.getDeclaredMethod("delete", User.class, int.class).toString();
}
@After
@@ -122,6 +128,7 @@ public class TransactionInterceptionTest {
messages.clear();
userRepository.deleteAll();
omegaContext.clear();
+ userService.resetCount();
}
@AfterClass
@@ -133,8 +140,9 @@ public class TransactionInterceptionTest {
User user = userService.add(this.user);
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+ user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -154,8 +162,9 @@ public class TransactionInterceptionTest {
}
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, illegalUser).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0,
+ illegalUser).toString(),
new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -176,10 +185,11 @@ public class TransactionInterceptionTest {
assertThat(userRepository.findByUsername(anotherUser.username()), is(nullValue()));
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, anotherUser).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0,
+ anotherUser).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -189,6 +199,60 @@ public class TransactionInterceptionTest {
}
@Test
+ public void retryTillSuccess() {
+ try {
+ userService.add(user, 1);
+ } catch (Exception e) {
+ fail("unexpected exception throw: " + e);
+ }
+
+ assertThat(messages.size(), is(4));
+
+ assertThat(messages.get(0),
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
+ .toString()));
+
+ String abortedEvent = messages.get(1);
+ assertThat(abortedEvent, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+ assertThat(messages.get(2),
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
+ .toString()));
+ assertThat(messages.get(3),
+ is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));
+
+ assertThat(userRepository.count(), is(1L));
+ userRepository.findAll().forEach(user -> assertThat(user, is(this.user)));
+ }
+
+ @Test
+ public void retryReachesMaximumThenThrowsException() {
+ try {
+ userService.add(user, 3);
+ expectFailing(IllegalStateException.class);
+ } catch (IllegalStateException e) {
+ assertThat(e.getMessage(), is("Retry harder"));
+ }
+
+ assertThat(messages.size(), is(4));
+ assertThat(messages.get(0),
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 3)
+ .toString()));
+
+ String abortedEvent1 = messages.get(1);
+ assertThat(abortedEvent1, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+ assertThat(messages.get(2),
+ is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 3)
+ .toString()));
+
+ String abortedEvent2 = messages.get(3);
+ assertThat(abortedEvent2, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));
+
+ assertThat(userRepository.count(), is(0L));
+ }
+
+ @Test
public void passesOmegaContextThroughDifferentThreads() throws Exception {
new Thread(() -> userService.add(user)).start();
waitTillSavedUser(username);
@@ -198,10 +262,10 @@ public class TransactionInterceptionTest {
waitTillSavedUser(usernameJack);
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -217,10 +281,10 @@ public class TransactionInterceptionTest {
waitTillSavedUser(usernameJack);
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, retriesMethod, 0, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, "", 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -239,8 +303,8 @@ public class TransactionInterceptionTest {
waitTillSavedUser(username);
assertArrayEquals(
- new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new String[] {
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -258,11 +322,10 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, retriesMethod, 0, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, "", 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
-
actorSystem.terminate();
}
@@ -298,7 +361,7 @@ public class TransactionInterceptionTest {
private final List<String> messages = new ArrayList<>();
@Bean
- CompensationContext compensationContext() {
+ CompensationContext recoveryContext() {
return new CompensationContext();
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
index c98c6ea..0618109 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionalUserService.java
@@ -17,21 +17,26 @@
package org.apache.servicecomb.saga.omega.transaction.spring;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
-
@Component
class TransactionalUserService {
static final String ILLEGAL_USER = "Illegal User";
private final UserRepository userRepository;
+ private int count = 0;
+
@Autowired
TransactionalUserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
+ void resetCount() {
+ this.count = 0;
+ }
+
@Compensable(compensationMethod = "delete")
User add(User user) {
if (ILLEGAL_USER.equals(user.username())) {
@@ -43,4 +48,19 @@ class TransactionalUserService {
void delete(User user) {
userRepository.delete(user);
}
+
+ @Compensable(retries = 2, compensationMethod = "delete")
+ User add(User user, int count) {
+ if (this.count < count) {
+ this.count += 1;
+ throw new IllegalStateException("Retry harder");
+ }
+ resetCount();
+ return userRepository.save(user);
+ }
+
+ void delete(User user, int count) {
+ resetCount();
+ userRepository.delete(user);
+ }
}
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
index c5c3d84..da9d4b2 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/User.java
@@ -62,7 +62,7 @@ public class User {
return false;
}
User user = (User) o;
- return id == user.id &&
+ return id.equals(user.id) &&
Objects.equals(username, user.username) &&
Objects.equals(email, user.email);
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 988d8d7..588d660 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
class CompensableInterceptor implements EventAwareInterceptor {
private final OmegaContext context;
@@ -30,9 +29,10 @@ class CompensableInterceptor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
- return sender.send(new TxStartedEvent(
- context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, retriesMethod, retries, message));
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message) {
+ return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
+ timeout, retriesMethod, retries, message));
}
@Override
@@ -42,7 +42,7 @@ class CompensableInterceptor implements EventAwareInterceptor {
@Override
public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
- sender.send(new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
- throwable));
+ sender.send(
+ new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 15cf91a..fe2eea5 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -18,26 +18,21 @@
package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
public class CompensationMessageHandler implements MessageHandler {
private final MessageSender sender;
- private final OmegaContext omegaContext;
+
private final CompensationContext context;
- public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext, CompensationContext context) {
+ public CompensationMessageHandler(MessageSender sender, CompensationContext context) {
this.sender = sender;
this.context = context;
- this.omegaContext = omegaContext;
}
@Override
- public void onReceive(String globalTxId, String localTxId, String parentTxId,
- String compensationMethod, Object... payloads) {
- String oldLocalTxId = omegaContext.localTxId();
- omegaContext.setLocalTxId(parentTxId);
- context.compensate(globalTxId, localTxId, compensationMethod, payloads);
+ public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+ Object... payloads) {
+ context.apply(globalTxId, localTxId, compensationMethod, payloads);
sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
- omegaContext.setLocalTxId(oldLocalTxId);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
index 86cc840..0844981 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecovery.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,64 +25,59 @@ import javax.transaction.InvalidTransactionException;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Aspect
-public class TransactionAspect {
+/**
+ * DefaultRecovery is used to execute business logic once.
+ * The corresponding events will report to alpha server before and after the execution of business logic.
+ * If there are errors while executing the business logic, a TxAbortedEvent will be reported to alpha.
+ *
+ * pre post
+ * request --------- 2.business logic --------- response
+ * \ |
+ * 1.TxStartedEvent \ | 3.TxEndedEvent
+ * \ |
+ * ----------------------
+ * alpha
+ */
+public class DefaultRecovery implements RecoveryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final OmegaContext context;
-
- private final CompensableInterceptor interceptor;
-
- public TransactionAspect(MessageSender sender, OmegaContext context) {
- this.context = context;
- this.interceptor = new CompensableInterceptor(context, sender);
- }
-
- @Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
- Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
+ @Override
+ public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+ OmegaContext context, String parentTxId, int retries) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
- Object[] args = joinPoint.getArgs();
- int retries = compensable.retries();
- String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
- String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
+ String compensationSignature =
+ compensable.compensationMethod().isEmpty() ? "" : compensationMethodSignature(joinPoint, compensable, method);
- String localTxId = context.localTxId();
- context.newLocalTxId();
+ String retrySignature = (retries != 0 || compensationSignature.isEmpty()) ? method.toString() : "";
- AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
+ AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.timeout(),
+ retrySignature, retries, joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
- context.setLocalTxId(localTxId);
+ context.setLocalTxId(parentTxId);
throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
" because global transaction " + context.globalTxId() + " has already aborted.");
}
- LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
try {
Object result = joinPoint.proceed();
- interceptor.postIntercept(localTxId, compensationSignature);
+ interceptor.postIntercept(parentTxId, compensationSignature);
return result;
} catch (Throwable throwable) {
- interceptor.onError(localTxId, compensationSignature, throwable);
+ interceptor.onError(parentTxId, compensationSignature, throwable);
throw throwable;
- } finally {
- context.setLocalTxId(localTxId);
- LOG.debug("Restored context back to {}", context);
}
}
- private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
+ String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
throws NoSuchMethodException {
-
return joinPoint.getTarget()
.getClass()
.getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index e20bea2..285d549 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
public interface EventAwareInterceptor {
EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message) {
return new AlphaResponse(false);
}
@@ -33,7 +34,8 @@ public interface EventAwareInterceptor {
}
};
- AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message);
+ AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message);
void postIntercept(String parentTxId, String compensationMethod);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
new file mode 100644
index 0000000..d1a28c2
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecovery.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import javax.transaction.InvalidTransactionException;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ForwardRecovery is used to execute business logic with the given retries times.
+ * If retries is above 0, it will retry the given times at most.
+ * If retries == -1, it will retry forever until interrupted.
+ */
+public class ForwardRecovery extends DefaultRecovery {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // TODO: 2018/03/10 we do not support retry with timeout yet
+ @Override
+ public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+ OmegaContext context, String parentTxId, int retries) throws Throwable {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ int remains = retries;
+ try {
+ while (true) {
+ try {
+ return super.apply(joinPoint, compensable, interceptor, context, parentTxId, remains);
+ } catch (Throwable throwable) {
+ if (throwable instanceof InvalidTransactionException) {
+ throw throwable;
+ }
+
+ remains = remains == -1 ? -1 : remains - 1;
+ if (remains == 0) {
+ LOG.error(
+ "Retried sub tx failed maximum times, global tx id: {}, local tx id: {}, method: {}, retried times: {}",
+ context.globalTxId(), context.localTxId(), method.toString(), retries);
+ throw throwable;
+ }
+
+ LOG.warn("Retrying sub tx failed, global tx id: {}, local tx id: {}, method: {}, remains: {}",
+ context.globalTxId(), context.localTxId(), method.toString(), remains);
+ Thread.sleep(compensable.retryDelayInMilliseconds());
+ }
+ }
+ } catch (InterruptedException e) {
+ String errorMessage = "Failed to handle tx because it is interrupted, global tx id: " + context.globalTxId()
+ + ", local tx id: " + context.localTxId() + ", method: " + method.toString();
+ LOG.error(errorMessage);
+ interceptor.onError(parentTxId, compensationMethodSignature(joinPoint, compensable, method), e);
+ throw new OmegaException(errorMessage);
+ }
+ }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
similarity index 64%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
index 8e288df..bc1d4d8 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicy.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
package org.apache.servicecomb.saga.omega.transaction;
-import org.apache.servicecomb.saga.common.EventType;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.aspectj.lang.ProceedingJoinPoint;
-public class TxCompensatedEvent extends TxEvent {
- public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
- }
+public interface RecoveryPolicy {
+ Object apply(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
+ OmegaContext context, String parentTxId, int retries) throws Throwable;
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
similarity index 56%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
index 8c70e3a..f59ac2b 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/RecoveryPolicyFactory.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,17 @@
package org.apache.servicecomb.saga.omega.transaction;
-import org.apache.servicecomb.saga.common.EventType;
+public class RecoveryPolicyFactory {
+ private static final RecoveryPolicy DEFAULT_RECOVERY = new DefaultRecovery();
-public class SagaEndedEvent extends TxEvent {
+ private static final RecoveryPolicy FORWARD_RECOVERY = new ForwardRecovery();
- public SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+ /**
+ * If retries == 0, use the default recovery to execute only once.
+ * If retries > 0, it will use the forward recovery and retry the given times at most.
+ * If retries == -1, it will use the forward recovery and retry forever until interrupted.
+ */
+ static RecoveryPolicy getRecoveryPolicy(int retries) {
+ return retries != 0 ? FORWARD_RECOVERY : DEFAULT_RECOVERY;
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 8c70e3a..2e28b5e 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -20,8 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.common.EventType;
public class SagaEndedEvent extends TxEvent {
-
- public SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
+ SagaEndedEvent(String globalTxId, String localTxId) {
+ super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0, "", 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 49dd8e4..486f28c 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,10 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod, int retries, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message) {
try {
- return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout, retriesMethod, retries));
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index a2ee58c..8722deb 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -47,7 +47,7 @@ public class SagaStartAspect {
initializeOmegaContext();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), method.toString(), 0);
+ sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout(), "", 0);
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
try {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index cb76a26..0e87a97 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class SagaStartedEvent extends TxEvent {
public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
// use "" instead of null as compensationMethod requires not null in sql
- super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout);
+ super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout, "", 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 86cc840..f7a98ee 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -20,8 +20,6 @@ package org.apache.servicecomb.saga.omega.transaction;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
-import javax.transaction.InvalidTransactionException;
-
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.aspectj.lang.ProceedingJoinPoint;
@@ -47,45 +45,17 @@ public class TransactionAspect {
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
-
- Object[] args = joinPoint.getArgs();
- int retries = compensable.retries();
- String retriesSignature = ((MethodSignature) joinPoint.getSignature()).getMethod().toString();
- String compensationSignature = compensationMethodSignature(joinPoint, compensable, method);
-
String localTxId = context.localTxId();
context.newLocalTxId();
-
- AlphaResponse response = interceptor.preIntercept(localTxId, compensationSignature, compensable.timeout(), retriesSignature, retries, args);
- if (response.aborted()) {
- String abortedLocalTxId = context.localTxId();
- context.setLocalTxId(localTxId);
- throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +
- " because global transaction " + context.globalTxId() + " has already aborted.");
- }
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
+ int retries = compensable.retries();
+ RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
try {
- Object result = joinPoint.proceed();
- interceptor.postIntercept(localTxId, compensationSignature);
-
- return result;
- } catch (Throwable throwable) {
- interceptor.onError(localTxId, compensationSignature, throwable);
- throw throwable;
+ return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
} finally {
context.setLocalTxId(localTxId);
LOG.debug("Restored context back to {}", context);
}
}
-
- private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method)
- throws NoSuchMethodException {
-
- return joinPoint.getTarget()
- .getClass()
- .getDeclaredMethod(compensable.compensationMethod(), method.getParameterTypes())
- .toString();
- }
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index d6aa533..f0bac54 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -24,7 +24,8 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxAbortedEvent extends TxEvent {
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
- super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable));
+ super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0,
+ stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index 8e288df..cd709e4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+ super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 8d6666a..f702c43 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
+ super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, "", 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index a5b5514..a158af1 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -31,21 +31,22 @@ public class TxEvent {
private final String compensationMethod;
private final int timeout;
private final Object[] payloads;
- private final String retriesMethod;
+
+ private final String retryMethod;
private final int retries;
public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
- int timeout, String retriesMethod, int retries, Object... payloads) {
+ int timeout, String retryMethod, int retries, Object... payloads) {
this.timestamp = System.currentTimeMillis();
this.type = type;
+ this.globalTxId = globalTxId;
this.localTxId = localTxId;
this.parentTxId = parentTxId;
this.compensationMethod = compensationMethod;
- this.payloads = payloads;
- this.globalTxId = globalTxId;
this.timeout = timeout;
- this.retriesMethod = retriesMethod;
+ this.retryMethod = retryMethod;
this.retries = retries;
+ this.payloads = payloads;
}
public long timestamp() {
@@ -80,8 +81,8 @@ public class TxEvent {
return timeout;
}
- public String retriesMethod() {
- return retriesMethod;
+ public String retryMethod() {
+ return retryMethod;
}
public int retries() {
@@ -95,7 +96,9 @@ public class TxEvent {
", localTxId='" + localTxId + '\'' +
", parentTxId='" + parentTxId + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
- ", timeout='" + timeout + '\'' +
+ ", timeout=" + timeout +
+ ", retryMethod='" + retryMethod + '\'' +
+ ", retries=" + retries +
", payloads=" + Arrays.toString(payloads) +
'}';
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index cb2580e..5d2ae12 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,8 +21,9 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxStartedEvent extends TxEvent {
- public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
- String compensationMethod, int timeout, String retriesMethod, int retries, Object... payloads) {
- super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retriesMethod, retries, payloads);
+ public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+ int timeout, String retryMethod, int retries, Object... payloads) {
+ super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, retryMethod,
+ retries, payloads);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
index e9bf6a7..78c4b91 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Compensable.java
@@ -50,7 +50,9 @@ public @interface Compensable {
*
* @return
*/
- String compensationMethod();
+ String compensationMethod() default "";
+
+ int retryDelayInMilliseconds() default 0;
/**
* <code>@Compensable</code> method timeout, in seconds. <br>
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 76ffc03..1e01ea5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -47,7 +48,8 @@ public class CompensableInterceptorTest {
};
private final String message = uniquify("message");
- private final String retriesMethod = uniquify("retries");
+
+ private final String retryMethod = uniquify("retryMethod");
private final String compensationMethod = getClass().getCanonicalName();
@SuppressWarnings("unchecked")
@@ -64,7 +66,7 @@ public class CompensableInterceptorTest {
@Test
public void sendsTxStartedEventBefore() throws Exception {
int retries = new Random().nextInt();
- interceptor.preIntercept(parentTxId, compensationMethod, 0, retriesMethod, retries, message);
+ interceptor.preIntercept(parentTxId, compensationMethod, 0, retryMethod, retries, message);
TxEvent event = messages.get(0);
@@ -72,7 +74,7 @@ public class CompensableInterceptorTest {
assertThat(event.localTxId(), is(localTxId));
assertThat(event.parentTxId(), is(parentTxId));
assertThat(event.retries(), is(retries));
- assertThat(event.retriesMethod(), is(retriesMethod));
+ assertThat(event.retryMethod(), is(retryMethod));
assertThat(event.type(), is(EventType.TxStartedEvent));
assertThat(event.compensationMethod(), is(compensationMethod));
assertThat(asList(event.payloads()), contains(message));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index c9c7394..d5d5de5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.CompensationContext;
-import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.Before;
import org.junit.Test;
public class CompensationMessageHandlerTest {
@@ -42,15 +42,21 @@ public class CompensationMessageHandlerTest {
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
private final String parentTxId = uniquify("parentTxId");
+
private final String compensationMethod = getClass().getCanonicalName();
private final String payload = uniquify("blah");
- private final OmegaContext omegaContext = mock(OmegaContext.class);
private final CompensationContext context = mock(CompensationContext.class);
- private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext, context);
+
+ private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, context);
+
+ @Before
+ public void setUp() {
+ events.clear();
+ }
@Test
- public void sendsEventOnCompensationCompleted() throws Exception {
+ public void sendsCompensatedEventOnCompensationCompleted() {
handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
assertThat(events.size(), is(1));
@@ -63,6 +69,6 @@ public class CompensationMessageHandlerTest {
assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
assertThat(event.payloads().length, is(0));
- verify(context).compensate(globalTxId, localTxId, compensationMethod, payload);
+ verify(context).apply(globalTxId, localTxId, compensationMethod, payload);
}
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
similarity index 55%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
index 31d148f..75062bc 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/DefaultRecoveryTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import javax.transaction.InvalidTransactionException;
@@ -40,28 +42,39 @@ import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
-public class TransactionAspectTest {
+public class DefaultRecoveryTest {
private final List<TxEvent> messages = new ArrayList<>();
+
private final String globalTxId = UUID.randomUUID().toString();
+
private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+
private final String newLocalTxId = UUID.randomUUID().toString();
+ private final RuntimeException oops = new RuntimeException("oops");
+
+ @SuppressWarnings("unchecked")
+ private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+ private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+ private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+ private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+ private final Compensable compensable = mock(Compensable.class);
+
private final MessageSender sender = e -> {
messages.add(e);
return new AlphaResponse(false);
};
- private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
- private final MethodSignature methodSignature = mock(MethodSignature.class);
- @SuppressWarnings("unchecked")
- private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
- private final Compensable compensable = mock(Compensable.class);
+ private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
- private final OmegaContext omegaContext = new OmegaContext(idGenerator);
- private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+ private final RecoveryPolicy recoveryPolicy = new DefaultRecovery();
@Before
public void setUp() throws Exception {
@@ -71,75 +84,99 @@ public class TransactionAspectTest {
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
when(compensable.compensationMethod()).thenReturn("doNothing");
+ when(compensable.retries()).thenReturn(0);
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
}
@Test
- public void newLocalTxIdInCompensable() throws Throwable {
- aspect.advise(joinPoint, compensable);
+ public void recordEndedEventWhenSuccess() throws Throwable {
+ when(joinPoint.proceed()).thenReturn(null);
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
- TxEvent startedEvent = messages.get(0);
+ assertThat(messages.size(), is(2));
+ TxEvent startedEvent = messages.get(0);
assertThat(startedEvent.globalTxId(), is(globalTxId));
- assertThat(startedEvent.localTxId(), is(newLocalTxId));
- assertThat(startedEvent.parentTxId(), is(localTxId));
+ assertThat(startedEvent.localTxId(), is(localTxId));
+ assertThat(startedEvent.parentTxId(), is(parentTxId));
assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent.retries(), is(0));
+ assertThat(startedEvent.retryMethod(), is(""));
TxEvent endedEvent = messages.get(1);
-
assertThat(endedEvent.globalTxId(), is(globalTxId));
- assertThat(endedEvent.localTxId(), is(newLocalTxId));
- assertThat(endedEvent.parentTxId(), is(localTxId));
+ assertThat(endedEvent.localTxId(), is(localTxId));
+ assertThat(endedEvent.parentTxId(), is(parentTxId));
assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
-
- assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(localTxId));
}
@Test
- public void restoreContextOnCompensableError() throws Throwable {
- RuntimeException oops = new RuntimeException("oops");
-
+ public void recordAbortedEventWhenFailed() throws Throwable {
when(joinPoint.proceed()).thenThrow(oops);
try {
- aspect.advise(joinPoint, compensable);
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
expectFailing(RuntimeException.class);
} catch (RuntimeException e) {
- assertThat(e, is(oops));
+ assertThat(e.getMessage(), is("oops"));
}
- TxEvent event = messages.get(1);
-
- assertThat(event.globalTxId(), is(globalTxId));
- assertThat(event.localTxId(), is(newLocalTxId));
- assertThat(event.parentTxId(), is(localTxId));
- assertThat(event.type(), is(EventType.TxAbortedEvent));
+ assertThat(messages.size(), is(2));
- assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(localTxId));
+ TxEvent startedEvent = messages.get(0);
+ assertThat(startedEvent.globalTxId(), is(globalTxId));
+ assertThat(startedEvent.localTxId(), is(localTxId));
+ assertThat(startedEvent.parentTxId(), is(parentTxId));
+ assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent.retries(), is(0));
+ assertThat(startedEvent.retryMethod(), is(""));
+
+ TxEvent abortedEvent = messages.get(1);
+ assertThat(abortedEvent.globalTxId(), is(globalTxId));
+ assertThat(abortedEvent.localTxId(), is(localTxId));
+ assertThat(abortedEvent.parentTxId(), is(parentTxId));
+ assertThat(abortedEvent.type(), is(EventType.TxAbortedEvent));
}
@Test
- public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
+ public void returnImmediatelyWhenReceivedRejectResponse() {
MessageSender sender = mock(MessageSender.class);
when(sender.send(any())).thenReturn(new AlphaResponse(true));
- TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+ CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
+
try {
- aspect.advise(joinPoint, compensable);
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
expectFailing(InvalidTransactionException.class);
} catch (InvalidTransactionException e) {
- System.out.println(e.getMessage());
assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+ } catch (Throwable throwable) {
+ fail("unexpected exception throw: " + throwable);
}
verify(sender, times(1)).send(any());
}
+ @Test
+ public void recordRetryMethodWhenRetriesIsSet() throws Throwable {
+ int retries = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
+ when(compensable.retries()).thenReturn(retries);
+
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, retries);
+
+ TxEvent startedEvent = messages.get(0);
+
+ assertThat(startedEvent.globalTxId(), is(globalTxId));
+ assertThat(startedEvent.localTxId(), is(localTxId));
+ assertThat(startedEvent.parentTxId(), is(parentTxId));
+ assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent.retries(), is(retries));
+ assertThat(startedEvent.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+ }
+
private String doNothing() {
return "doNothing";
}
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
similarity index 61%
copy from omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
copy to omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
index 31d148f..76fe55a 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/ForwardRecoveryTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -40,28 +41,41 @@ import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
-public class TransactionAspectTest {
+public class ForwardRecoveryTest {
private final List<TxEvent> messages = new ArrayList<>();
+
private final String globalTxId = UUID.randomUUID().toString();
+
private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+
private final String newLocalTxId = UUID.randomUUID().toString();
+ private final RuntimeException oops = new RuntimeException("oops");
+
+ @SuppressWarnings("unchecked")
+ private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
+
+ private final OmegaContext omegaContext = new OmegaContext(idGenerator);
+
+ private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+
+ private final MethodSignature methodSignature = mock(MethodSignature.class);
+
+ private final Compensable compensable = mock(Compensable.class);
+
private final MessageSender sender = e -> {
messages.add(e);
return new AlphaResponse(false);
};
- private final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
- private final MethodSignature methodSignature = mock(MethodSignature.class);
- @SuppressWarnings("unchecked")
- private final IdGenerator<String> idGenerator = mock(IdGenerator.class);
- private final Compensable compensable = mock(Compensable.class);
+ private final CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
- private final OmegaContext omegaContext = new OmegaContext(idGenerator);
- private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
+ private final RecoveryPolicy recoveryPolicy = new ForwardRecovery();
+
+ private volatile OmegaException exception;
@Before
public void setUp() throws Exception {
@@ -71,75 +85,75 @@ public class TransactionAspectTest {
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
when(compensable.compensationMethod()).thenReturn("doNothing");
+ when(compensable.retries()).thenReturn(0);
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
}
@Test
- public void newLocalTxIdInCompensable() throws Throwable {
- aspect.advise(joinPoint, compensable);
-
- TxEvent startedEvent = messages.get(0);
-
- assertThat(startedEvent.globalTxId(), is(globalTxId));
- assertThat(startedEvent.localTxId(), is(newLocalTxId));
- assertThat(startedEvent.parentTxId(), is(localTxId));
- assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ public void forwardExceptionWhenGlobalTxAborted() {
+ MessageSender sender = mock(MessageSender.class);
+ when(sender.send(any())).thenReturn(new AlphaResponse(true));
- TxEvent endedEvent = messages.get(1);
+ CompensableInterceptor interceptor = new CompensableInterceptor(omegaContext, sender);
- assertThat(endedEvent.globalTxId(), is(globalTxId));
- assertThat(endedEvent.localTxId(), is(newLocalTxId));
- assertThat(endedEvent.parentTxId(), is(localTxId));
- assertThat(endedEvent.type(), is(EventType.TxEndedEvent));
+ try {
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 0);
+ expectFailing(InvalidTransactionException.class);
+ } catch (InvalidTransactionException e) {
+ assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+ } catch (Throwable throwable) {
+ fail("unexpected exception throw: " + throwable);
+ }
- assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(localTxId));
+ verify(sender, times(1)).send(any());
}
@Test
- public void restoreContextOnCompensableError() throws Throwable {
- RuntimeException oops = new RuntimeException("oops");
-
+ public void throwExceptionWhenRetryReachesMaximum() throws Throwable {
+ when(compensable.retries()).thenReturn(2);
when(joinPoint.proceed()).thenThrow(oops);
try {
- aspect.advise(joinPoint, compensable);
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, 2);
expectFailing(RuntimeException.class);
} catch (RuntimeException e) {
- assertThat(e, is(oops));
+ assertThat(e.getMessage(), is("oops"));
}
- TxEvent event = messages.get(1);
-
- assertThat(event.globalTxId(), is(globalTxId));
- assertThat(event.localTxId(), is(newLocalTxId));
- assertThat(event.parentTxId(), is(localTxId));
- assertThat(event.type(), is(EventType.TxAbortedEvent));
-
- assertThat(omegaContext.globalTxId(), is(globalTxId));
- assertThat(omegaContext.localTxId(), is(localTxId));
+ assertThat(messages.size(), is(4));
+ assertThat(messages.get(0).type(), is(EventType.TxStartedEvent));
+ assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+ assertThat(messages.get(2).type(), is(EventType.TxStartedEvent));
+ assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
}
@Test
- public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
- MessageSender sender = mock(MessageSender.class);
- when(sender.send(any())).thenReturn(new AlphaResponse(true));
-
- TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
- try {
- aspect.advise(joinPoint, compensable);
- expectFailing(InvalidTransactionException.class);
- } catch (InvalidTransactionException e) {
- System.out.println(e.getMessage());
- assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
- }
+ public void keepRetryingTillInterrupted() throws Throwable {
+ when(compensable.retries()).thenReturn(-1);
+ when(compensable.retryDelayInMilliseconds()).thenReturn(1000);
+ when(joinPoint.proceed()).thenThrow(oops);
- verify(sender, times(1)).send(any());
+ Thread thread = new Thread(() -> {
+ try {
+ recoveryPolicy.apply(joinPoint, compensable, interceptor, omegaContext, parentTxId, -1);
+ expectFailing(OmegaException.class);
+ } catch (OmegaException e) {
+ exception = e;
+ } catch (Throwable throwable) {
+ fail("unexpected exception throw: " + throwable);
+ }
+ });
+ thread.start();
+
+ thread.interrupt();
+ thread.join();
+
+ assertThat(exception.getMessage().contains("Failed to handle tx because it is interrupted"), is(true));
}
private String doNothing() {
return "doNothing";
}
-}
+}
\ No newline at end of file
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
deleted file mode 100644
index e69de29..0000000
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
index 31d148f..0aa9549 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java
@@ -20,18 +20,13 @@ package org.apache.servicecomb.saga.omega.transaction;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import javax.transaction.InvalidTransactionException;
-
import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
@@ -40,13 +35,11 @@ import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
public class TransactionAspectTest {
private final List<TxEvent> messages = new ArrayList<>();
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
-
private final String newLocalTxId = UUID.randomUUID().toString();
private final MessageSender sender = e -> {
@@ -71,6 +64,7 @@ public class TransactionAspectTest {
when(methodSignature.getMethod()).thenReturn(this.getClass().getDeclaredMethod("doNothing"));
when(compensable.compensationMethod()).thenReturn("doNothing");
+ when(compensable.retries()).thenReturn(0);
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
@@ -86,6 +80,8 @@ public class TransactionAspectTest {
assertThat(startedEvent.localTxId(), is(newLocalTxId));
assertThat(startedEvent.parentTxId(), is(localTxId));
assertThat(startedEvent.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent.retries(), is(0));
+ assertThat(startedEvent.retryMethod().isEmpty(), is(true));
TxEvent endedEvent = messages.get(1);
@@ -123,20 +119,84 @@ public class TransactionAspectTest {
}
@Test
- public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable {
- MessageSender sender = mock(MessageSender.class);
- when(sender.send(any())).thenReturn(new AlphaResponse(true));
+ public void retryReachesMaximumAndForwardException() throws Throwable {
+ RuntimeException oops = new RuntimeException("oops");
+ when(joinPoint.proceed()).thenThrow(oops);
+ when(compensable.retries()).thenReturn(3);
- TransactionAspect aspect = new TransactionAspect(sender, omegaContext);
try {
aspect.advise(joinPoint, compensable);
- expectFailing(InvalidTransactionException.class);
- } catch (InvalidTransactionException e) {
- System.out.println(e.getMessage());
- assertThat(e.getMessage().contains("Abort sub transaction"), is(true));
+ expectFailing(RuntimeException.class);
+ } catch (RuntimeException e) {
+ assertThat(e.getMessage(), is("oops"));
}
- verify(sender, times(1)).send(any());
+ assertThat(messages.size(), is(6));
+
+ TxEvent startedEvent1 = messages.get(0);
+ assertThat(startedEvent1.globalTxId(), is(globalTxId));
+ assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent1.parentTxId(), is(localTxId));
+ assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent1.retries(), is(3));
+ assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+ assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+ TxEvent startedEvent2 = messages.get(2);
+ assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent2.retries(), is(2));
+
+ assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+ TxEvent startedEvent3 = messages.get(4);
+ assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent3.retries(), is(1));
+
+ assertThat(messages.get(5).type(), is(EventType.TxAbortedEvent));
+
+ assertThat(omegaContext.globalTxId(), is(globalTxId));
+ assertThat(omegaContext.localTxId(), is(localTxId));
+ }
+
+ @Test
+ public void keepRetryingTillSuccess() throws Throwable {
+ RuntimeException oops = new RuntimeException("oops");
+ when(joinPoint.proceed()).thenThrow(oops).thenThrow(oops).thenReturn(null);
+ when(compensable.retries()).thenReturn(-1);
+
+ aspect.advise(joinPoint, compensable);
+
+ assertThat(messages.size(), is(6));
+
+ TxEvent startedEvent1 = messages.get(0);
+ assertThat(startedEvent1.globalTxId(), is(globalTxId));
+ assertThat(startedEvent1.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent1.parentTxId(), is(localTxId));
+ assertThat(startedEvent1.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent1.retries(), is(-1));
+ assertThat(startedEvent1.retryMethod(), is(this.getClass().getDeclaredMethod("doNothing").toString()));
+
+ assertThat(messages.get(1).type(), is(EventType.TxAbortedEvent));
+
+ TxEvent startedEvent2 = messages.get(2);
+ assertThat(startedEvent2.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent2.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent2.retries(), is(-1));
+
+ assertThat(messages.get(3).type(), is(EventType.TxAbortedEvent));
+
+ TxEvent startedEvent3 = messages.get(4);
+ assertThat(startedEvent3.localTxId(), is(newLocalTxId));
+ assertThat(startedEvent3.type(), is(EventType.TxStartedEvent));
+ assertThat(startedEvent3.retries(), is(-1));
+
+ assertThat(messages.get(5).type(), is(EventType.TxEndedEvent));
+
+ assertThat(omegaContext.globalTxId(), is(globalTxId));
+ assertThat(omegaContext.localTxId(), is(localTxId));
}
private String doNothing() {
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 9605a37..d2c6f77 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,9 +22,11 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
option java_outer_classname = "TxEventProto";
service TxEventService {
- rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
+ rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {
+ }
rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
- rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
+ rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
+ }
}
message GrpcServiceConfig {
@@ -48,13 +50,14 @@ message GrpcTxEvent {
string instanceId = 9;
int32 timeout = 10;
int32 retries = 11;
- string retriesMethod = 12;
+ string retryMethod = 12;
}
message GrpcCompensateCommand {
string globalTxId = 1;
string localTxId = 2;
string parentTxId = 3;
- string compensateMethod = 4;
+ string compensationMethod = 4;
bytes payloads = 5;
-}
\ No newline at end of file
+}
+
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.