You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/02/08 08:50:01 UTC
[incubator-servicecomb-saga] 06/06: SCB-239 handle timeout in
EventScanner
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit bc3879f204e41dc6bb9bc9f290a12eb524202f51
Author: Eric Lee <da...@huawei.com>
AuthorDate: Mon Feb 5 23:02:29 2018 +0800
SCB-239 handle timeout in EventScanner
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../servicecomb/saga/alpha/core/EventScanner.java | 58 +++++++++----
.../saga/alpha/core/TxConsistentService.java | 35 +-------
.../servicecomb/saga/alpha/core/TxEvent.java | 39 ++++++---
.../saga/alpha/core/TxEventRepository.java | 4 +
.../servicecomb/saga/alpha/core/TxTimeout.java | 50 ++++++++---
.../saga/alpha/core/TxTimeoutRepository.java | 6 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 81 ++++--------------
.../servicecomb/saga/alpha/server/AlphaConfig.java | 2 +-
.../saga/alpha/server/SpringTxEventRepository.java | 10 +++
.../alpha/server/SpringTxTimeoutRepository.java | 26 ++----
.../alpha/server/TxEventEnvelopeRepository.java | 16 ++++
.../alpha/server/TxTimeoutEntityRepository.java | 36 ++++----
.../src/main/resources/schema-postgresql.sql | 13 ++-
.../saga/alpha/server/AlphaIntegrationTest.java | 7 +-
alpha/alpha-server/src/test/resources/schema.sql | 11 ++-
.../omega/transaction/OnceAwareInterceptor.java | 49 -----------
.../saga/omega/transaction/TransactionAspect.java | 1 -
.../transaction/OnceAwareInterceptorTest.java | 98 ----------------------
18 files changed, 202 insertions(+), 340 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 4f72a1c..a52ebe5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.saga.alpha.core;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
@@ -66,7 +67,9 @@ public class EventScanner implements Runnable {
private void pollEvents() {
scheduler.scheduleWithFixedDelay(
() -> {
- abortTimeoutEvent();
+ updateTimeoutStatus();
+ findTimeoutEvents();
+ abortTimeoutEvents();
saveUncompensatedEventsToCommands();
compensate();
updateCompensatedCommands();
@@ -78,6 +81,18 @@ public class EventScanner implements Runnable {
MILLISECONDS);
}
+ private void findTimeoutEvents() {
+ eventRepository.findTimeoutEvents()
+ .forEach(event -> {
+ log.info("Found timeout event {}", event);
+ timeoutRepository.save(txTimeoutOf(event));
+ });
+ }
+
+ private void updateTimeoutStatus() {
+ timeoutRepository.markTimeoutAsDone();
+ }
+
private void saveUncompensatedEventsToCommands() {
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
.forEach(event -> {
@@ -113,15 +128,15 @@ public class EventScanner implements Runnable {
markSagaEnded(event);
}
- private void abortTimeoutEvent() {
- timeoutRepository.findFirstTimeoutTxToAbort().forEach(event -> {
- log.info("Found timeout event {}", event);
+ private void abortTimeoutEvents() {
+ timeoutRepository.findFirstTimeout().forEach(timeout -> {
+ log.info("Found timeout event {} to abort", timeout);
- eventRepository.save(toTxAbortedEvent(event));
- timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId());
+ eventRepository.save(toTxAbortedEvent(timeout));
- if (event.type().equals(TxStartedEvent.name())) {
- omegaCallback.compensate(event);
+ if (timeout.type().equals(TxStartedEvent.name())) {
+ eventRepository.findTxStartedEventToCompensate(timeout.globalTxId(), timeout.localTxId())
+ .ifPresent(omegaCallback::compensate);
}
});
}
@@ -138,17 +153,16 @@ public class EventScanner implements Runnable {
private void markGlobalTxEnd(TxEvent event) {
eventRepository.save(toSagaEndedEvent(event));
- timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId());
log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
- private TxEvent toTxAbortedEvent(TxEvent event) {
+ private TxEvent toTxAbortedEvent(TxTimeout timeout) {
return new TxEvent(
- event.serviceName(),
- event.instanceId(),
- event.globalTxId(),
- event.localTxId(),
- event.parentTxId(),
+ timeout.serviceName(),
+ timeout.instanceId(),
+ timeout.globalTxId(),
+ timeout.localTxId(),
+ timeout.parentTxId(),
TxAbortedEvent.name(),
"",
("Transaction timeout").getBytes());
@@ -189,4 +203,18 @@ public class EventScanner implements Runnable {
command.payloads()
);
}
+
+ private TxTimeout txTimeoutOf(TxEvent event) {
+ return new TxTimeout(
+ event.id(),
+ event.serviceName(),
+ event.instanceId(),
+ event.globalTxId(),
+ event.localTxId(),
+ event.parentTxId(),
+ event.type(),
+ event.expiryTime(),
+ NEW.name()
+ );
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 541d54f..c55090a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,18 +17,10 @@
package org.apache.servicecomb.saga.alpha.core;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,12 +29,9 @@ public class TxConsistentService {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final TxEventRepository eventRepository;
- private final TxTimeoutRepository timeoutRepository;
- public TxConsistentService(TxEventRepository eventRepository,
- TxTimeoutRepository timeoutRepository) {
+ public TxConsistentService(TxEventRepository eventRepository) {
this.eventRepository = eventRepository;
- this.timeoutRepository = timeoutRepository;
}
public boolean handle(TxEvent event) {
@@ -51,33 +40,11 @@ public class TxConsistentService {
return false;
}
- if (isEventWithTimeout(event)) {
- saveTxTimeout(event);
- }
-
eventRepository.save(event);
- if (Arrays.asList(TxEndedEvent.name(), SagaEndedEvent.name(), TxAbortedEvent.name()).contains(event.type())) {
- CompletableFuture.runAsync(() -> timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()));
- }
-
return true;
}
- private boolean isEventWithTimeout(TxEvent event) {
- return Arrays.asList(TxStartedEvent.name(), SagaStartedEvent.name()).contains(event.type()) && event.timeout() != 0;
- }
-
- private void saveTxTimeout(TxEvent event) {
- Date expireTime = new Date(event.creationTime().getTime() + SECONDS.toMillis(event.timeout()));
- timeoutRepository.save(
- new TxTimeout(
- event.globalTxId(),
- event.localTxId(),
- expireTime,
- NEW.name()));
- }
-
private boolean isGlobalTxAborted(TxEvent event) {
return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index e34b7c6..42a202f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -17,6 +17,8 @@
package org.apache.servicecomb.saga.alpha.core;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.Date;
import javax.persistence.Entity;
@@ -24,10 +26,12 @@ import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Transient;
-import javax.persistence.Version;
@Entity
public class TxEvent {
+ @Transient
+ private static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00
+
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long surrogateId;
@@ -40,14 +44,9 @@ public class TxEvent {
private String parentTxId;
private String type;
private String compensationMethod;
+ private Date expiryTime;
private byte[] payloads;
- @Version
- private long version;
-
- @Transient
- private int timeout;
-
private TxEvent() {
}
@@ -61,7 +60,7 @@ public class TxEvent {
event.parentTxId,
event.type,
event.compensationMethod,
- 0,
+ event.expiryTime,
event.payloads);
}
@@ -117,7 +116,23 @@ public class TxEvent {
String compensationMethod,
int timeout,
byte[] payloads) {
+ this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type,
+ compensationMethod,
+ timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)),
+ payloads);
+ }
+ TxEvent(Long surrogateId,
+ String serviceName,
+ String instanceId,
+ Date creationTime,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ Date expiryTime,
+ byte[] payloads) {
this.surrogateId = surrogateId;
this.serviceName = serviceName;
this.instanceId = instanceId;
@@ -127,8 +142,8 @@ public class TxEvent {
this.parentTxId = parentTxId;
this.type = type;
this.compensationMethod = compensationMethod;
+ this.expiryTime = expiryTime;
this.payloads = payloads;
- this.timeout = timeout;
}
public String serviceName() {
@@ -171,8 +186,8 @@ public class TxEvent {
return surrogateId;
}
- public int timeout() {
- return timeout;
+ public Date expiryTime() {
+ return expiryTime;
}
@Override
@@ -187,7 +202,7 @@ public class TxEvent {
", parentTxId='" + parentTxId + '\'' +
", type='" + type + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
- ", timeout='" + timeout + '\'' +
+ ", expiryTime='" + expiryTime + '\'' +
'}';
}
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index b974bd9..0af6fb5 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -25,6 +25,10 @@ public interface TxEventRepository {
Optional<TxEvent> findFirstAbortedGlobalTransaction();
+ List<TxEvent> findTimeoutEvents();
+
+ Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId);
+
List<TxEvent> findTransactions(String globalTxId, String type);
List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
index dc365e3..00ca2ec 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java
@@ -31,9 +31,14 @@ public class TxTimeout {
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long surrogateId;
+ private long eventId;
+ private String serviceName;
+ private String instanceId;
private String globalTxId;
private String localTxId;
- private Date expireTime;
+ private String parentTxId;
+ private String type;
+ private Date expiryTime;
private String status;
@Version
@@ -42,13 +47,27 @@ public class TxTimeout {
TxTimeout() {
}
- public TxTimeout(String globalTxId, String localTxId, Date expireTime, String status) {
+ TxTimeout(long eventId, String serviceName, String instanceId, String globalTxId, String localTxId,
+ String parentTxId, String type, Date expiryTime, String status) {
+ this.eventId = eventId;
+ this.serviceName = serviceName;
+ this.instanceId = instanceId;
this.globalTxId = globalTxId;
this.localTxId = localTxId;
- this.expireTime = expireTime;
+ this.parentTxId = parentTxId;
+ this.type = type;
+ this.expiryTime = expiryTime;
this.status = status;
}
+ public String serviceName() {
+ return serviceName;
+ }
+
+ public String instanceId() {
+ return instanceId;
+ }
+
public String globalTxId() {
return globalTxId;
}
@@ -57,24 +76,33 @@ public class TxTimeout {
return localTxId;
}
- public Date expireTime() {
- return expireTime;
+ public String parentTxId() {
+ return parentTxId;
}
- public String status() {
- return status;
+ public String type() {
+ return type;
}
- public void setStatus(String status) {
- this.status = status;
+ public Date expiryTime() {
+ return expiryTime;
+ }
+
+ public String status() {
+ return status;
}
@Override
public String toString() {
return "TxTimeout{" +
- "globalTxId='" + globalTxId + '\'' +
+ "eventId=" + eventId +
+ ", serviceName='" + serviceName + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", globalTxId='" + globalTxId + '\'' +
", localTxId='" + localTxId + '\'' +
- ", expireTime=" + expireTime +
+ ", parentTxId='" + parentTxId + '\'' +
+ ", type='" + type + '\'' +
+ ", expiryTime=" + expiryTime +
", status=" + status +
'}';
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
index 88758c7..97387a3 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java
@@ -20,9 +20,9 @@ package org.apache.servicecomb.saga.alpha.core;
import java.util.List;
public interface TxTimeoutRepository {
- void save(TxTimeout event);
+ void save(TxTimeout timeout);
- void markTxTimeoutAsDone(String globalTxId, String localTxId);
+ void markTimeoutAsDone();
- List<TxEvent> findFirstTimeoutTxToAbort();
+ List<TxTimeout> findFirstTimeout();
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 5467368..d220994 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -19,15 +19,12 @@ package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.Collections.emptyList;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -57,6 +54,18 @@ public class TxConsistentServiceTest {
}
@Override
+ public List<TxEvent> findTimeoutEvents() {
+ return emptyList();
+ }
+
+ @Override
+ public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) {
+ return events.stream()
+ .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()))
+ .findFirst();
+ }
+
+ @Override
public List<TxEvent> findTransactions(String globalTxId, String type) {
return events.stream()
.filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
@@ -78,29 +87,6 @@ public class TxConsistentServiceTest {
}
};
- private final Deque<TxTimeout> timeouts = new ConcurrentLinkedDeque<>();
- private final TxTimeoutRepository timeoutRepository = new TxTimeoutRepository() {
- @Override
- public void save(TxTimeout timeout) {
- timeouts.add(timeout);
- }
-
- @Override
- public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
- for (TxTimeout timeout : timeouts) {
- if (timeout.globalTxId().equals(globalTxId) && timeout.localTxId().equals(localTxId)) {
- timeout.setStatus(DONE.name());
- break;
- }
- }
- }
-
- @Override
- public List<TxEvent> findFirstTimeoutTxToAbort() {
- return null;
- }
- };
-
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
@@ -109,13 +95,12 @@ public class TxConsistentServiceTest {
private final String compensationMethod = getClass().getCanonicalName();
- private final TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository);
+ private final TxConsistentService consistentService = new TxConsistentService(eventRepository);
private final byte[] payloads = "yeah".getBytes();
@Before
public void setUp() throws Exception {
events.clear();
- timeouts.clear();
}
@Test
@@ -132,7 +117,6 @@ public class TxConsistentServiceTest {
}
assertThat(this.events, contains(events));
- assertThat(timeouts.isEmpty(), is(true));
}
@Test
@@ -146,46 +130,11 @@ public class TxConsistentServiceTest {
consistentService.handle(event);
assertThat(events.size(), is(2));
- assertThat(timeouts.isEmpty(), is(true));
- }
-
- @Test
- public void persistTimeoutEventOnArrival() {
- TxEvent[] events = {
- newEventWithTimeout(SagaStartedEvent, globalTxId,2),
- newEventWithTimeout(TxStartedEvent, 1),
- newEvent(TxEndedEvent),
- newEvent(TxCompensatedEvent),
- eventOf(SagaEndedEvent, globalTxId)};
-
- for (TxEvent event : events) {
- consistentService.handle(event);
- }
-
- assertThat(this.events, contains(events));
- assertThat(timeouts.size(), is(2));
- await().atMost(1, SECONDS).until(this::allTimeoutIsDone);
- }
-
- private boolean allTimeoutIsDone() {
- for (TxTimeout timeout : timeouts) {
- if (!timeout.status().equals(DONE.name())) {
- return false;
- }
- }
- return true;
}
private TxEvent newEvent(EventType eventType) {
- return newEventWithTimeout(eventType, 0);
- }
-
- private TxEvent newEventWithTimeout(EventType eventType, int timeout) {
- return newEventWithTimeout(eventType, localTxId, timeout);
- }
-
- private TxEvent newEventWithTimeout(EventType eventType, String localTxId, int timeout) {
- return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, timeout, payloads);
+ return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod,
+ payloads);
}
private TxEvent eventOf(EventType eventType, String localTxId) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 9472d0d..6889c9f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -94,7 +94,7 @@ class AlphaConfig {
eventRepository, commandRepository, timeoutRepository,
omegaCallback, eventPollingInterval).run();
- TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository);
+ TxConsistentService consistentService = new TxConsistentService(eventRepository);
ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 5531d8f..d6ea21c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -43,6 +43,16 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
+ public List<TxEvent> findTimeoutEvents() {
+ return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST);
+ }
+
+ @Override
+ public Optional<TxEvent> findTxStartedEventToCompensate(String globalTxId, String localTxId) {
+ return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, localTxId);
+ }
+
+ @Override
public List<TxEvent> findTransactions(String globalTxId, String type) {
return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type);
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
index 71c808d..ee75496 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java
@@ -17,15 +17,12 @@
package org.apache.servicecomb.saga.alpha.server;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
-import java.util.ArrayList;
import java.util.List;
import javax.transaction.Transactional;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.alpha.core.TxTimeout;
import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
import org.springframework.data.domain.PageRequest;
@@ -38,26 +35,21 @@ public class SpringTxTimeoutRepository implements TxTimeoutRepository {
}
@Override
- public void save(TxTimeout event) {
- timeoutRepo.save(event);
+ public void save(TxTimeout timeout) {
+ timeoutRepo.save(timeout);
}
@Override
- public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
- timeoutRepo.updateStatusByGlobalTxIdAndLocalTxId(DONE.name(), globalTxId, localTxId);
+ public void markTimeoutAsDone() {
+ timeoutRepo.updateStatusOfFinishedTx();
}
@Transactional
@Override
- public List<TxEvent> findFirstTimeoutTxToAbort() {
- List<TxEvent> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
- List<TxEvent> pendingTimeoutEvents = new ArrayList<>();
- timeoutEvents.forEach(event -> {
- if (timeoutRepo.updateStatusFromNewByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())
- != 0) {
- pendingTimeoutEvents.add(event);
- }
- });
- return pendingTimeoutEvents;
+ public List<TxTimeout> findFirstTimeout() {
+ List<TxTimeout> timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1));
+ timeoutEvents.forEach(event -> timeoutRepo
+ .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId()));
+ return timeoutEvents;
}
}
\ No newline at end of file
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index c4984f9..0eaf089 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -38,6 +38,22 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))")
Optional<TxEvent> findFirstAbortedGlobalTxByType();
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
+ + " AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS( "
+ + " SELECT t1.globalTxId FROM TxEvent t1 "
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.localTxId = t.localTxId "
+ + " AND t1.type != t.type"
+ + ")")
+ List<TxEvent> findTimeoutEvents(Pageable pageable);
+
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.globalTxId = ?1 "
+ + " AND t.localTxId = ?2 "
+ + " AND t.type = 'TxStartedEvent'")
+ Optional<TxEvent> findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId);
+
@Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
+ "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, "
+ "t.type, t.compensationMethod, t.payloads"
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
index cc39397..f0e264a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java
@@ -22,7 +22,6 @@ import java.util.List;
import javax.persistence.LockModeType;
import javax.transaction.Transactional;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.alpha.core.TxTimeout;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Lock;
@@ -44,25 +43,22 @@ interface TxTimeoutEntityRepository extends CrudRepository<TxTimeout, Long> {
@Param("globalTxId") String globalTxId,
@Param("localTxId") String localTxId);
+ @Lock(LockModeType.OPTIMISTIC)
+ @Query("SELECT t FROM TxTimeout AS t "
+ + "WHERE t.status = 'NEW' "
+ + " AND t.expiryTime < CURRENT_TIMESTAMP "
+ + "ORDER BY t.expiryTime ASC")
+ List<TxTimeout> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+
@Transactional
@Modifying(clearAutomatically = true)
- @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t "
- + "SET t.status = :status "
- + "WHERE t.globalTxId = :globalTxId "
- + " AND t.localTxId = :localTxId "
- + " AND t.status = 'NEW'")
- int updateStatusFromNewByGlobalTxIdAndLocalTxId(
- @Param("status") String status,
- @Param("globalTxId") String globalTxId,
- @Param("localTxId") String localTxId);
-
- @Lock(LockModeType.OPTIMISTIC)
- @Query("SELECT te FROM TxEvent AS te "
- + "INNER JOIN TxTimeout AS tt "
- + "ON te.globalTxId = tt.globalTxId "
- + " AND te.localTxId = tt.localTxId "
- + " AND tt.status = 'NEW' "
- + " AND tt.expireTime < CURRENT_TIMESTAMP "
- + "ORDER BY tt.expireTime ASC")
- List<TxEvent> findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable);
+ @Query("UPDATE TxTimeout t "
+ + "SET t.status = 'DONE' "
+ + "WHERE t.status != 'DONE' AND EXISTS ("
+ + " SELECT t1.globalTxId FROM TxEvent t1 "
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.localTxId = t.localTxId "
+ + " AND t1.type != t.type"
+ + ")")
+ void updateStatusOfFinishedTx();
}
diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
index 484c5e3..e7f774b 100644
--- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql
+++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql
@@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
parentTxId varchar(36) DEFAULT NULL,
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
- payloads bytea,
- version bigint NOT NULL
+ expiryTime timestamp(6) NOT NULL,
+ payloads bytea
);
CREATE INDEX IF NOT EXISTS saga_events_index ON TxEvent (surrogateId, globalTxId, localTxId, type);
@@ -35,11 +35,16 @@ CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId,
CREATE TABLE IF NOT EXISTS TxTimeout (
surrogateId BIGSERIAL PRIMARY KEY,
+ eventId bigint NOT NULL UNIQUE,
+ serviceName varchar(16) NOT NULL,
+ instanceId varchar(36) NOT NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
- expireTime TIMESTAMP NOT NULL,
+ parentTxId varchar(36) DEFAULT NULL,
+ type varchar(50) NOT NULL,
+ expiryTime TIMESTAMP NOT NULL,
status varchar(12),
version bigint NOT NULL
);
-CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expireTime, globalTxId, localTxId, status);
+CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expiryTime, globalTxId, localTxId, status);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 225c194..497c244 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE;
-import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
@@ -383,10 +382,6 @@ public class AlphaIntegrationTest {
asyncStub.onConnected(serviceConfig, compensateResponseObserver);
blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1));
- await().atMost(1, SECONDS).until(() -> timeoutEntityRepository.count() == 1L);
- Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
- timeouts.forEach(timeout -> assertThat(timeout.status(), is(NEW.name())));
-
await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
@@ -395,7 +390,7 @@ public class AlphaIntegrationTest {
assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
assertThat(timeoutEntityRepository.count(), is(1L));
- timeouts = timeoutEntityRepository.findAll();
+ Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
timeouts.forEach(timeout -> {
assertThat(timeout.status(), is(DONE.name()));
assertThat(timeout.globalTxId(), is(globalTxId));
diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql
index 0958389..929c69f 100644
--- a/alpha/alpha-server/src/test/resources/schema.sql
+++ b/alpha/alpha-server/src/test/resources/schema.sql
@@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent (
parentTxId varchar(36) DEFAULT NULL,
type varchar(50) NOT NULL,
compensationMethod varchar(256) NOT NULL,
- payloads varbinary(10240),
- version bigint NOT NULL
+ expiryTime TIMESTAMP NOT NULL,
+ payloads varbinary(10240)
);
CREATE TABLE IF NOT EXISTS Command (
@@ -29,9 +29,14 @@ CREATE TABLE IF NOT EXISTS Command (
CREATE TABLE IF NOT EXISTS TxTimeout (
surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
+ eventId bigint NOT NULL UNIQUE,
+ serviceName varchar(36) NOT NULL,
+ instanceId varchar(36) NOT NULL,
globalTxId varchar(36) NOT NULL,
localTxId varchar(36) NOT NULL,
- expireTime TIMESTAMP NOT NULL,
+ parentTxId varchar(36) DEFAULT NULL,
+ type varchar(50) NOT NULL,
+ expiryTime TIMESTAMP NOT NULL,
status varchar(12),
version bigint NOT NULL
);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
deleted file mode 100644
index 3015a01..0000000
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-class OnceAwareInterceptor implements EventAwareInterceptor {
- private final EventAwareInterceptor interceptor;
- private final AtomicReference<EventAwareInterceptor> interceptorRef;
-
- OnceAwareInterceptor(EventAwareInterceptor interceptor) {
- this.interceptor = interceptor;
- this.interceptorRef = new AtomicReference<>(interceptor);
- }
-
- @Override
- public AlphaResponse preIntercept(String parentTxId, String signature, int timeout, Object... args) {
- return interceptor.preIntercept(parentTxId, signature, timeout, args);
- }
-
- @Override
- public void postIntercept(String parentTxId, String signature) {
- if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
- interceptor.postIntercept(parentTxId, signature);
- }
- }
-
- @Override
- public void onError(String parentTxId, String signature, Throwable throwable) {
- if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) {
- interceptor.onError(parentTxId, signature, throwable);
- }
- }
-}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
index 090fe2e..932b990 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java
@@ -54,7 +54,6 @@ public class TransactionAspect {
String localTxId = context.localTxId();
context.newLocalTxId();
- OnceAwareInterceptor interceptor = new OnceAwareInterceptor(this.interceptor);
AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
deleted file mode 100644
index 90a133b..0000000
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/OnceAwareInterceptorTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.saga.omega.transaction;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-public class OnceAwareInterceptorTest {
- private static final int runningCounts = 1000;
-
- private final String localTxId = uniquify("localTxId");
- private final String signature = uniquify("signature");
-
- private final AtomicInteger postInterceptInvoked = new AtomicInteger();
- private final AtomicInteger onErrorInvoked = new AtomicInteger();
-
- private final EventAwareInterceptor underlying = new EventAwareInterceptor() {
- @Override
- public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) {
- return new AlphaResponse(false);
- }
-
- @Override
- public void postIntercept(String parentTxId, String compensationMethod) {
- postInterceptInvoked.incrementAndGet();
- }
-
- @Override
- public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
- onErrorInvoked.incrementAndGet();
- }
- };
-
- private final ExecutorService executorService = Executors.newFixedThreadPool(2);
-
- @Test
- public void invokePostIntercept() throws Exception {
- List<Future<?>> futures = new LinkedList<>();
-
- for (int i = 0; i < runningCounts; i++) {
- OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
-
- futures.add(executorService.submit(() -> interceptor.postIntercept(localTxId, signature)));
- }
-
- waitTillAllDone(futures);
-
- assertThat(postInterceptInvoked.get(), is(runningCounts));
- }
-
- @Test
- public void invokeOnErrorConcurrently() throws Exception {
- RuntimeException oops = new RuntimeException("oops");
- List<Future<?>> futures = new LinkedList<>();
-
- for (int i = 0; i < runningCounts; i++) {
- OnceAwareInterceptor interceptor = new OnceAwareInterceptor(underlying);
-
- futures.add(executorService.submit(() -> interceptor.onError(localTxId, signature, oops)));
- }
-
- waitTillAllDone(futures);
-
- assertThat(onErrorInvoked.get(), is(runningCounts));
- }
-
- private void waitTillAllDone(List<Future<?>> futures)
- throws InterruptedException, java.util.concurrent.ExecutionException {
- for (Future<?> future : futures) {
- future.get();
- }
- }
-}
--
To stop receiving notification emails like this one, please contact
ningjiang@apache.org.