You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/02/08 08:49:56 UTC
[incubator-servicecomb-saga] 01/06: SCB-239 omega stateless
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 13d930970b0c8a6abe96f096399d050cba2019e1
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 23 15:10:35 2018 +0800
SCB-239 omega stateless
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../servicecomb/saga/alpha/core/EventScanner.java | 43 +++++++++++++++++++---
.../servicecomb/saga/alpha/core/TxEvent.java | 23 +++++++++---
.../saga/alpha/core/TxEventRepository.java | 6 ++-
.../alpha/core/CompositeOmegaCallbackTest.java | 1 +
.../saga/alpha/core/TxConsistentServiceTest.java | 15 +++++++-
.../servicecomb/saga/alpha/core/TxEventMaker.java | 6 +--
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 7 +++-
.../saga/alpha/server/SpringTxEventRepository.java | 14 ++++++-
.../alpha/server/TxEventEnvelopeRepository.java | 29 +++++++++++++--
.../src/main/resources/schema-postgresql.sql | 1 +
.../saga/alpha/server/AlphaIntegrationTest.java | 42 +++++++++++++++++++--
alpha/alpha-server/src/test/resources/schema.sql | 1 +
.../grpc/LoadBalancedClusterMessageSenderTest.java | 6 ++-
.../connector/grpc/RetryableMessageSenderTest.java | 4 +-
.../spring/TransactionInterceptionTest.java | 20 +++++-----
.../omega/transaction/CompensableInterceptor.java | 6 +--
.../omega/transaction/EventAwareInterceptor.java | 4 +-
.../saga/omega/transaction/SagaEndedEvent.java | 2 +-
.../transaction/SagaStartAnnotationProcessor.java | 4 +-
.../saga/omega/transaction/SagaStartAspect.java | 2 +-
.../saga/omega/transaction/SagaStartedEvent.java | 5 +--
.../omega/transaction/TimeAwareInterceptor.java | 4 +-
.../saga/omega/transaction/TransactionAspect.java | 2 +-
.../saga/omega/transaction/TxAbortedEvent.java | 6 +--
.../saga/omega/transaction/TxCompensatedEvent.java | 2 +-
.../saga/omega/transaction/TxEndedEvent.java | 2 +-
.../saga/omega/transaction/TxEvent.java | 14 +++++--
.../saga/omega/transaction/TxStartedEvent.java | 5 ++-
.../transaction/CompensableInterceptorTest.java | 2 +-
.../SagaStartAnnotationProcessorTest.java | 4 +-
.../transaction/TimeAwareInterceptorTest.java | 2 +-
.../src/main/proto/GrpcTxEvent.proto | 1 +
32 files changed, 214 insertions(+), 71 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 2d51a74..0980f3a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,12 +19,13 @@ package org.apache.servicecomb.saga.alpha.core;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
-import java.util.Date;
+import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
@@ -42,6 +43,7 @@ public class EventScanner implements Runnable {
private long nextEndedEventId;
private long nextCompensatedEventId;
+ private long nextTimeoutEventId;
public EventScanner(ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
@@ -64,10 +66,12 @@ public class EventScanner implements Runnable {
private void pollEvents() {
scheduler.scheduleWithFixedDelay(
() -> {
+ abortTimeoutEvent();
saveUncompensatedEventsToCommands();
compensate();
updateCompensatedCommands();
- deleteDuplicateSagaEndedEvents();
+ deleteDuplicateEvents();
+ updateTransactionStatus();
},
0,
eventPollingInterval,
@@ -92,11 +96,11 @@ public class EventScanner implements Runnable {
});
}
- private void deleteDuplicateSagaEndedEvents() {
+ private void deleteDuplicateEvents() {
try {
- eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
+ eventRepository.deleteDuplicateEvents(Arrays.asList(TxAbortedEvent.name(), SagaEndedEvent.name()));
} catch (Exception e) {
- log.warn("Failed to delete duplicate SagaEndedEvent", e);
+ log.warn("Failed to delete duplicate event", e);
}
}
@@ -109,6 +113,19 @@ public class EventScanner implements Runnable {
markSagaEnded(event);
}
+ private void abortTimeoutEvent() {
+ eventRepository.findFirstTimeoutEventByIdGreaterThan(nextTimeoutEventId)
+ .ifPresent((TxEvent event) -> {
+ log.info("Found timeout event {}", event);
+ nextTimeoutEventId = event.id();
+ eventRepository.save(toTxAbortedEvent(event));
+ });
+ }
+
+ private void updateTransactionStatus() {
+ eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd);
+ }
+
private void markSagaEnded(TxEvent event) {
if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
markGlobalTxEnd(event);
@@ -120,16 +137,29 @@ public class EventScanner implements Runnable {
log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
+ private TxEvent toTxAbortedEvent(TxEvent event) {
+ return new TxEvent(
+ event.serviceName(),
+ event.instanceId(),
+ event.globalTxId(),
+ event.localTxId(),
+ event.parentTxId(),
+ TxAbortedEvent.name(),
+ "",
+ null,
+ EMPTY_PAYLOAD);
+ }
+
private TxEvent toSagaEndedEvent(TxEvent event) {
return new TxEvent(
event.serviceName(),
event.instanceId(),
- new Date(),
event.globalTxId(),
event.globalTxId(),
null,
SagaEndedEvent.name(),
"",
+ null,
EMPTY_PAYLOAD);
}
@@ -153,6 +183,7 @@ public class EventScanner implements Runnable {
command.parentTxId(),
TxStartedEvent.name(),
command.compensationMethod(),
+ null,
command.payloads()
);
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 1364cb7..04d385b 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -33,6 +33,7 @@ public class TxEvent {
private String serviceName;
private String instanceId;
private Date creationTime;
+ private Date expireTime;
private String globalTxId;
private String localTxId;
private String parentTxId;
@@ -64,8 +65,9 @@ public class TxEvent {
String parentTxId,
String type,
String compensationMethod,
+ Date expireTime,
byte[] payloads) {
- this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, expireTime, payloads);
}
public TxEvent(
@@ -77,21 +79,25 @@ public class TxEvent {
String parentTxId,
String type,
String compensationMethod,
+ Date expireTime,
byte[] payloads) {
- this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod,
+ expireTime, payloads);
}
- public TxEvent(
- long id,
+ TxEvent(Long surrogateId,
String serviceName,
String instanceId,
+ Date creationTime,
String globalTxId,
String localTxId,
String parentTxId,
String type,
String compensationMethod,
+ int timeout,
byte[] payloads) {
- this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
+ this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
+ compensationMethod, timeout == 0 ? null : new Date(creationTime.getTime() + timeout*1000), payloads);
}
TxEvent(Long surrogateId,
@@ -103,6 +109,7 @@ public class TxEvent {
String parentTxId,
String type,
String compensationMethod,
+ Date expireTime,
byte[] payloads) {
this.surrogateId = surrogateId;
@@ -114,6 +121,7 @@ public class TxEvent {
this.parentTxId = parentTxId;
this.type = type;
this.compensationMethod = compensationMethod;
+ this.expireTime = expireTime;
this.payloads = payloads;
}
@@ -149,6 +157,10 @@ public class TxEvent {
return compensationMethod;
}
+ public Date expireTime() {
+ return expireTime;
+ }
+
public byte[] payloads() {
return payloads;
}
@@ -169,6 +181,7 @@ public class TxEvent {
", parentTxId='" + parentTxId + '\'' +
", type='" + type + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
+ ", expireTime='" + expireTime + '\'' +
'}';
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b61aa06..ec564f9 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -23,11 +23,15 @@ import java.util.Optional;
public interface TxEventRepository {
void save(TxEvent event);
+ Optional<TxEvent> findFirstAbortedGlobalTransaction();
+
List<TxEvent> findTransactions(String globalTxId, String type);
List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
- void deleteDuplicateEvents(String type);
+ Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id);
+
+ void deleteDuplicateEvents(List<String> types);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
index 5cda4c5..b201ebe 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -166,6 +166,7 @@ public class CompositeOmegaCallbackTest {
UUID.randomUUID().toString(),
eventType.name(),
getClass().getCanonicalName(),
+ null,
uniquify("blah").getBytes());
}
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 231d5bf..a73c754 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -49,6 +49,11 @@ public class TxConsistentServiceTest {
}
@Override
+ public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+ return Optional.empty();
+ }
+
+ @Override
public List<TxEvent> findTransactions(String globalTxId, String type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
@@ -66,7 +71,12 @@ public class TxConsistentServiceTest {
}
@Override
- public void deleteDuplicateEvents(String type) {
+ public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void deleteDuplicateEvents(List<String> types) {
}
};
@@ -111,7 +121,7 @@ public class TxConsistentServiceTest {
}
private TxEvent newEvent(EventType eventType) {
- return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
+ return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, null, payloads);
}
private TxEvent eventOf(EventType eventType, String localTxId) {
@@ -121,6 +131,7 @@ public class TxConsistentServiceTest {
UUID.randomUUID().toString(),
eventType.name(),
compensationMethod,
+ null,
payloads);
}
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
index 68c33a9..4b65528 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java
@@ -17,9 +17,8 @@
package org.apache.servicecomb.saga.alpha.core;
-import org.apache.servicecomb.saga.common.EventType;
-
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.util.Date;
import java.util.UUID;
@@ -33,8 +32,9 @@ class TxEventMaker {
uniquify("globalTxId"),
uniquify("localTxId"),
UUID.randomUUID().toString(),
- EventType.TxStartedEvent.name(),
+ TxStartedEvent.name(),
TxEventMaker.class.getCanonicalName(),
+ null,
uniquify("blah").getBytes());
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index eced7f9..679b6ba 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -56,7 +56,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
omegaCallbacks
.computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
- .computeIfAbsent(request.getInstanceId(), key -> new GrpcOmegaCallback(responseObserver));
+ .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver));
}
// TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
@@ -75,15 +75,18 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
@Override
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
+ Date date = new Date(message.getTimestamp());
+ int timeout = message.getTimeout();
boolean ok = txConsistentService.handle(new TxEvent(
message.getServiceName(),
message.getInstanceId(),
- new Date(message.getTimestamp()),
+ date,
message.getGlobalTxId(),
message.getLocalTxId(),
message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
message.getType(),
message.getCompensationMethod(),
+ timeout == 0 ? null : new Date(date.getTime() + timeout),
message.getPayloads().toByteArray()
));
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index ad32148..c3e8f04 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -37,6 +37,11 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
+ public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
+ return eventRepo.findFirstAbortedGlobalTxByType();
+ }
+
+ @Override
public List<TxEvent> findTransactions(String globalTxId, String type) {
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
}
@@ -52,7 +57,12 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public void deleteDuplicateEvents(String type) {
- eventRepo.deleteByType(type);
+ public Optional<TxEvent> findFirstTimeoutEventByIdGreaterThan(long id) {
+ return eventRepo.findFirstTimeoutSurrogateIdGreaterThan(id);
+ }
+
+ @Override
+ public void deleteDuplicateEvents(List<String> types) {
+ eventRepo.deleteByTypes(types);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 2e52fef..e46b264 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -31,8 +31,16 @@ import org.springframework.data.repository.CrudRepository;
interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
List<TxEvent> findByGlobalTxId(String globalTxId);
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( "
+ + " SELECT t1.globalTxId FROM TxEvent t1"
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
+ Optional<TxEvent> findFirstAbortedGlobalTxByType();
+
@Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
- + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
+ + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
+ + "t.type, t.compensationMethod, t.expireTime, t.payloads"
+ ") FROM TxEvent t "
+ "WHERE t.globalTxId = ?1 AND t.type = ?2")
List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
@@ -70,13 +78,26 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
+ + " AND t.expireTime IS NOT NULL "
+ + " AND t.expireTime < CURRENT_TIMESTAMP "
+ + " AND t.surrogateId > ?1 AND NOT EXISTS ("
+ + " SELECT t1.globalTxId"
+ + " FROM TxEvent t1 "
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.localTxId = t.localTxId "
+ + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent')) "
+ + "ORDER BY t.surrogateId ASC")
+ Optional<TxEvent> findFirstTimeoutSurrogateIdGreaterThan(long surrogateId);
+
@Transactional
@Modifying(clearAutomatically = true)
@Query("DELETE FROM TxEvent t "
- + "WHERE t.type = ?1 AND t.surrogateId NOT IN ("
+ + "WHERE t.type IN ?1 AND t.surrogateId NOT IN ("
+ " SELECT MAX(t1.surrogateId) FROM TxEvent t1 "
- + " WHERE t1.type = ?1"
+ + " WHERE t1.type = t.type"
+ " GROUP BY t1.globalTxId"
+ ")")
- void deleteByType(String type);
+ void deleteByTypes(List<String> types);
}
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index d6b5172..a3aee1c 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent (
serviceName varchar(16) NOT NULL,
instanceId varchar(36) NOT NULL,
creationTime timestamp(6) NOT NULL DEFAULT CURRENT_DATE,
+ expireTime timestamp(6) NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 539f610..585e755 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
+import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
@@ -367,6 +368,34 @@ public class AlphaIntegrationTest {
});
}
+ @Test
+ public void abortTimeoutSagaStartedEvent() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
+
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 3);
+
+ List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+ assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+ assertThat(events.get(1).type(), is(TxAbortedEvent.name()));
+ assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
+ }
+
+ @Test
+ public void abortTimeoutTxStartedEvent() {
+ asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+ blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId));
+ blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1));
+
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 4);
+
+ List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+ assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
+ assertThat(events.get(1).type(), is(TxStartedEvent.name()));
+ assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
+ assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
+ }
+
private GrpcAck onCompensation(GrpcCompensateCommand command) {
return blockingStub.onTxEvent(
eventOf(TxCompensatedEvent,
@@ -393,9 +422,14 @@ public class AlphaIntegrationTest {
parentTxId,
TxAbortedEvent.name(),
compensationMethod,
+ null,
payload.getBytes());
}
+ private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) {
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout);
+ }
+
private GrpcTxEvent someGrpcEvent(EventType type) {
return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
}
@@ -405,11 +439,11 @@ public class AlphaIntegrationTest {
}
private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) {
- return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
+ return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0);
}
private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
- return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod);
+ return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0);
}
private GrpcTxEvent eventOf(EventType eventType,
@@ -417,7 +451,8 @@ public class AlphaIntegrationTest {
String localTxId,
String parentTxId,
byte[] payloads,
- String compensationMethod) {
+ String compensationMethod,
+ int timeout) {
return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
@@ -428,6 +463,7 @@ public class AlphaIntegrationTest {
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
+ .setTimeout(timeout)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 344fdda..53bcd1e 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS TxEvent (
serviceName varchar(36) NOT NULL,
instanceId varchar(36) NOT NULL,
creationTime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ expireTime TIMESTAMP NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
parentTxId varchar(36) DEFAULT NULL,
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index 8062ae9..bb24c5c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -97,7 +97,8 @@ public class LoadBalancedClusterMessageSenderTest {
private final String localTxId = uniquify("localTxId");
private final String parentTxId = uniquify("parentTxId");
private final String compensationMethod = getClass().getCanonicalName();
- private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, "blah");
+ private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId,
+ compensationMethod, 0, "blah");
private final String serviceName = uniquify("serviceName");
private final String[] addresses = {"localhost:8080", "localhost:8090"};
@@ -299,7 +300,7 @@ public class LoadBalancedClusterMessageSenderTest {
public void forwardSendResult() {
assertThat(messageSender.send(event).aborted(), is(false));
- TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah");
+ TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah");
assertThat(messageSender.send(rejectEvent).aborted(), is(true));
}
@@ -356,6 +357,7 @@ public class LoadBalancedClusterMessageSenderTest {
request.getLocalTxId(),
request.getParentTxId(),
request.getCompensationMethod(),
+ 0,
new String(request.getPayloads().toByteArray())));
sleep();
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
index 562c50f..95bda85 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java
@@ -42,7 +42,7 @@ public class RetryableMessageSenderTest {
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
- private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x");
+ private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0);
@Test
public void sendEventWhenSenderIsAvailable() {
@@ -56,7 +56,7 @@ public class RetryableMessageSenderTest {
@Test
public void blowsUpWhenEventIsSagaStarted() {
- TxEvent event = new SagaStartedEvent(globalTxId, localTxId);
+ TxEvent event = new SagaStartedEvent(globalTxId, localTxId, 0);
try {
messageSender.send(event);
diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 7daf954..9fd2a7e 100644
--- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -131,7 +131,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -152,7 +152,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(),
new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
@@ -174,9 +174,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()
@@ -196,9 +196,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -215,9 +215,9 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(),
- new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(),
+ new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(),
new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -237,7 +237,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[]{
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
@@ -255,7 +255,7 @@ public class TransactionInterceptionTest {
assertArrayEquals(
new String[] {
- new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(),
+ new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(),
new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()},
toArray(messages)
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
index 074a5ec..53e5158 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java
@@ -29,9 +29,9 @@ class CompensableInterceptor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
- return sender
- .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message));
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
+ return sender.send(new TxStartedEvent(
+ context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message));
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
index 4425949..7b71dd4 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.omega.transaction;
public interface EventAwareInterceptor {
EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() {
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
return new AlphaResponse(false);
}
@@ -33,7 +33,7 @@ public interface EventAwareInterceptor {
}
};
- AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message);
+ AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message);
void postIntercept(String parentTxId, String compensationMethod) throws Throwable;
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
index 7074d8f..8c70e3a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java
@@ -22,6 +22,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class SagaEndedEvent extends TxEvent {
public SagaEndedEvent(String globalTxId, String localTxId) {
- super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "");
+ super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
index 7ef021a..d3d55fe 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java
@@ -32,9 +32,9 @@ class SagaStartAnnotationProcessor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
try {
- return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
index 0951752..db8e3a0 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java
@@ -52,7 +52,7 @@ public class SagaStartAspect {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor);
- interceptor.preIntercept(context.globalTxId(), method.toString());
+ interceptor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
scheduleTimeoutTask(interceptor, method, sagaStart.timeout());
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
index 54f61e4..cb76a26 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java
@@ -20,9 +20,8 @@ package org.apache.servicecomb.saga.omega.transaction;
import org.apache.servicecomb.saga.common.EventType;
public class SagaStartedEvent extends TxEvent {
-
- public SagaStartedEvent(String globalTxId, String localTxId) {
+ public SagaStartedEvent(String globalTxId, String localTxId, int timeout) {
// use "" instead of null as compensationMethod requires not null in sql
- super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "");
+ super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
index 2057fbc..9de26d2 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java
@@ -30,8 +30,8 @@ class TimeAwareInterceptor implements EventAwareInterceptor {
}
@Override
- public AlphaResponse preIntercept(String parentTxId, String signature, Object... args) {
- return interceptor.preIntercept(parentTxId, signature, args);
+ public AlphaResponse preIntercept(String parentTxId, String signature, int timeout, Object... args) {
+ return interceptor.preIntercept(parentTxId, signature, timeout, args);
}
@Override
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 5a61dc7..718d1fd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -59,7 +59,7 @@ public class TransactionAspect {
context.newLocalTxId();
TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor);
- AlphaResponse response = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs());
+ AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
context.setLocalTxId(localTxId);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index 13df2f7..d6aa533 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -17,14 +17,14 @@
package org.apache.servicecomb.saga.omega.transaction;
-import org.apache.servicecomb.saga.common.EventType;
-
import java.io.PrintWriter;
import java.io.StringWriter;
+import org.apache.servicecomb.saga.common.EventType;
+
public class TxAbortedEvent extends TxEvent {
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
- super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable));
+ super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable));
}
private static String stackTrace(Throwable e) {
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index dbbaeab..8e288df 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod);
+ super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 4e587c8..8d6666a 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -21,6 +21,6 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
- super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod);
+ super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0);
}
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
index 1398d3e..34be420 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java
@@ -17,10 +17,10 @@
package org.apache.servicecomb.saga.omega.transaction;
-import org.apache.servicecomb.saga.common.EventType;
-
import java.util.Arrays;
+import org.apache.servicecomb.saga.common.EventType;
+
public class TxEvent {
private final long timestamp;
@@ -29,9 +29,11 @@ public class TxEvent {
private final String localTxId;
private final String parentTxId;
private final String compensationMethod;
+ private final int timeout;
private final Object[] payloads;
- public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
+ public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod,
+ int timeout, Object... payloads) {
this.timestamp = System.currentTimeMillis();
this.type = type;
this.localTxId = localTxId;
@@ -39,6 +41,7 @@ public class TxEvent {
this.compensationMethod = compensationMethod;
this.payloads = payloads;
this.globalTxId = globalTxId;
+ this.timeout = timeout;
}
public long timestamp() {
@@ -69,6 +72,10 @@ public class TxEvent {
return compensationMethod;
}
+ public int timeout() {
+ return timeout;
+ }
+
@Override
public String toString() {
return type.name() + "{" +
@@ -76,6 +83,7 @@ public class TxEvent {
", localTxId='" + localTxId + '\'' +
", parentTxId='" + parentTxId + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
+ ", timeout='" + timeout + '\'' +
", payloads=" + Arrays.toString(payloads) +
'}';
}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
index ce93ea3..4732d95 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -21,7 +21,8 @@ import org.apache.servicecomb.saga.common.EventType;
public class TxStartedEvent extends TxEvent {
- public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
- super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, payloads);
+ public TxStartedEvent(String globalTxId, String localTxId, String parentTxId,
+ String compensationMethod, int timeout, Object... payloads) {
+ super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads);
}
}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
index 21af7e6..0ef9d4d 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java
@@ -62,7 +62,7 @@ public class CompensableInterceptorTest {
@Test
public void sendsTxStartedEventBefore() throws Exception {
- interceptor.preIntercept(parentTxId, compensationMethod, message);
+ interceptor.preIntercept(parentTxId, compensationMethod, 0, message);
TxEvent event = messages.get(0);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
index 566a456..cc84fc5 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java
@@ -65,7 +65,7 @@ public class SagaStartAnnotationProcessorTest {
@Test
public void sendsSagaStartedEvent() {
- sagaStartAnnotationProcessor.preIntercept(null, null);
+ sagaStartAnnotationProcessor.preIntercept(null, null, 0);
TxEvent event = messages.get(0);
@@ -99,7 +99,7 @@ public class SagaStartAnnotationProcessorTest {
doThrow(exception).when(sender).send(any());
try {
- sagaStartAnnotationProcessor.preIntercept(null, null);
+ sagaStartAnnotationProcessor.preIntercept(null, null, 0);
expectFailing(TransactionalException.class);
} catch (TransactionalException e) {
assertThat(e.getMessage(), is("exception"));
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
index 1136a45..0f2d2eb 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java
@@ -46,7 +46,7 @@ public class TimeAwareInterceptorTest {
private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
@Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) {
+ public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
return new AlphaResponse(false);
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 2636881..3944eee 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -46,6 +46,7 @@ message GrpcTxEvent {
bytes payloads = 7;
string serviceName = 8;
string instanceId = 9;
+ int32 timeout = 10;
}
message GrpcCompensateCommand {
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.