You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/20 12:00:18 UTC
[incubator-servicecomb-saga] 06/12: SCB-218 polling events into
commands in the background
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit fff84355f893699b3e6903ac6ba362cf7960cf41
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 19 10:28:22 2018 +0800
SCB-218 polling events into commands in the background
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/core/CommandRepository.java | 5 +-
...{TxConsistentService.java => EventScanner.java} | 139 +++++++-------
.../saga/alpha/core/TxConsistentService.java | 102 +---------
.../saga/alpha/core/TxEventRepository.java | 5 +-
.../saga/alpha/core/TxConsistentServiceTest.java | 208 ++-------------------
.../servicecomb/saga/alpha/server/AlphaConfig.java | 12 +-
.../saga/alpha/server/SpringCommandRepository.java | 25 +--
.../saga/alpha/server/SpringTxEventRepository.java | 10 +-
.../alpha/server/TxEventEnvelopeRepository.java | 31 ++-
.../saga/alpha/server/AlphaIntegrationTest.java | 12 +-
10 files changed, 126 insertions(+), 423 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 22f2b41..1da033f 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -20,11 +20,8 @@ package org.apache.servicecomb.saga.alpha.core;
import java.util.List;
public interface CommandRepository {
- boolean exists(String globalTxId, String localTxId);
- void saveCompensationCommand(String globalTxId, String localTxId);
-
- void saveCompensationCommands(String globalTxId);
+ Iterable<Command> saveCompensationCommands(String globalTxId);
void markCommandAsDone(String globalTxId, String localTxId);
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
similarity index 50%
copy from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
copy to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index 61beea5..ea4aaf6 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -19,111 +19,116 @@ package org.apache.servicecomb.saga.alpha.core;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TxConsistentService {
+public class EventScanner implements Runnable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
private static final byte[] EMPTY_PAYLOAD = new byte[0];
+ private final ScheduledExecutorService scheduler;
private final TxEventRepository eventRepository;
private final CommandRepository commandRepository;
private final OmegaCallback omegaCallback;
- private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
- put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
- put(TxAbortedEvent.name(), (event) -> compensate(event));
- put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
- }};
-
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final ScheduledExecutorService scheduler;
-
- public TxConsistentService(TxEventRepository eventRepository,
- CommandRepository commandRepository,
- OmegaCallback omegaCallback,
- int commandPollingInterval) {
+ private final int commandPollingInterval;
+ private final int eventPollingInterval;
- this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
- }
+ private long nextEndedEventId;
+ private long nextCompensatedEventId;
- public TxConsistentService(TxEventRepository eventRepository,
+ public EventScanner(ScheduledExecutorService scheduler,
+ TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
int commandPollingInterval,
- ScheduledExecutorService scheduler) {
+ int eventPollingInterval) {
+
+ this.scheduler = scheduler;
this.eventRepository = eventRepository;
this.commandRepository = commandRepository;
this.omegaCallback = omegaCallback;
- this.scheduler = scheduler;
-
- scheduleCompensationCommandPolling(commandPollingInterval);
+ this.commandPollingInterval = commandPollingInterval;
+ this.eventPollingInterval = eventPollingInterval;
}
- public boolean handle(TxEvent event) {
- if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
- return false;
- }
-
- eventRepository.save(event);
-
- executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
- return true;
+ @Override
+ public void run() {
+ pollCompensationCommand(commandPollingInterval);
+ pollEvents();
}
- private void compensateIfAlreadyAborted(TxEvent event) {
- if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
- commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
- }
+ private void pollEvents() {
+ scheduler.scheduleWithFixedDelay(
+ () -> {
+ saveUncompensatedEventsToCommands();
+ updateCompensatedCommands();
+ },
+ 0,
+ eventPollingInterval,
+ MILLISECONDS);
}
- private boolean isCompensationScheduled(TxEvent event) {
- return commandRepository.exists(event.globalTxId(), event.localTxId());
+ private void saveUncompensatedEventsToCommands() {
+ eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
+ .forEach(event -> {
+ log.info("Found uncompensated event {}", event);
+ nextEndedEventId = event.id();
+ commandRepository.saveCompensationCommands(event.globalTxId())
+ .forEach(command -> nextEndedEventId = command.id());
+ });
}
- private void compensate(TxEvent event) {
- commandRepository.saveCompensationCommands(event.globalTxId());
+ private void updateCompensatedCommands() {
+ eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId, TxCompensatedEvent.name())
+ .ifPresent(event -> {
+ log.info("Found compensated event {}", event);
+ nextCompensatedEventId = event.id();
+ updateCompensationStatus(event);
+ });
}
// TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
// unless we ask user to specify a name for each participant in the global TX in @Compensable
- private void updateCompensateStatus(TxEvent event) {
+ private void updateCompensationStatus(TxEvent event) {
commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
- log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+ log.info("Transaction with globalTxId {} and localTxId {} was compensated",
+ event.globalTxId(),
+ event.localTxId());
if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
&& commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
+
markGlobalTxEnd(event);
- log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
}
private void markGlobalTxEnd(TxEvent event) {
- eventRepository.save(new TxEvent(
- event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
- null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
+ eventRepository.save(toSagaEndedEvent(event));
+ log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
- private boolean isGlobalTxAborted(TxEvent event) {
- return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
+ private TxEvent toSagaEndedEvent(TxEvent event) {
+ return new TxEvent(
+ event.serviceName(),
+ event.instanceId(),
+ new Date(),
+ event.globalTxId(),
+ event.globalTxId(),
+ null,
+ SagaEndedEvent.name(),
+ "",
+ EMPTY_PAYLOAD);
}
- private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+ private void pollCompensationCommand(int commandPollingInterval) {
scheduler.scheduleWithFixedDelay(
() -> commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
@@ -131,19 +136,23 @@ public class TxConsistentService {
command.globalTxId(),
command.localTxId());
- omegaCallback.compensate(new TxEvent(
- command.serviceName(),
- command.instanceId(),
- command.globalTxId(),
- command.localTxId(),
- command.parentTxId(),
- TxStartedEvent.name(),
- command.compensationMethod(),
- command.payloads()
- ));
+ omegaCallback.compensate(txStartedEventOf(command));
}),
0,
commandPollingInterval,
MILLISECONDS);
}
+
+ private TxEvent txStartedEventOf(Command command) {
+ return new TxEvent(
+ command.serviceName(),
+ command.instanceId(),
+ command.globalTxId(),
+ command.localTxId(),
+ command.parentTxId(),
+ TxStartedEvent.name(),
+ command.compensationMethod(),
+ command.payloads()
+ );
+ }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 61beea5..c55090a 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,133 +17,35 @@
package org.apache.servicecomb.saga.alpha.core;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
-import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.lang.invoke.MethodHandles;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TxConsistentService {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
-
- private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final TxEventRepository eventRepository;
- private final CommandRepository commandRepository;
- private final OmegaCallback omegaCallback;
- private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
- put(TxEndedEvent.name(), (event) -> compensateIfAlreadyAborted(event));
- put(TxAbortedEvent.name(), (event) -> compensate(event));
- put(TxCompensatedEvent.name(), (event) -> updateCompensateStatus(event));
- }};
-
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final ScheduledExecutorService scheduler;
-
- public TxConsistentService(TxEventRepository eventRepository,
- CommandRepository commandRepository,
- OmegaCallback omegaCallback,
- int commandPollingInterval) {
- this(eventRepository, commandRepository, omegaCallback, commandPollingInterval, Executors.newSingleThreadScheduledExecutor());
- }
-
- public TxConsistentService(TxEventRepository eventRepository,
- CommandRepository commandRepository,
- OmegaCallback omegaCallback,
- int commandPollingInterval,
- ScheduledExecutorService scheduler) {
+ public TxConsistentService(TxEventRepository eventRepository) {
this.eventRepository = eventRepository;
- this.commandRepository = commandRepository;
- this.omegaCallback = omegaCallback;
- this.scheduler = scheduler;
-
- scheduleCompensationCommandPolling(commandPollingInterval);
}
public boolean handle(TxEvent event) {
if (TxStartedEvent.name().equals(event.type()) && isGlobalTxAborted(event)) {
+ log.info("Sub-transaction rejected, because its parent with globalTxId {} was already aborted", event.globalTxId());
return false;
}
eventRepository.save(event);
- executor.execute(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
return true;
}
- private void compensateIfAlreadyAborted(TxEvent event) {
- if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
- commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
- }
- }
-
- private boolean isCompensationScheduled(TxEvent event) {
- return commandRepository.exists(event.globalTxId(), event.localTxId());
- }
-
- private void compensate(TxEvent event) {
- commandRepository.saveCompensationCommands(event.globalTxId());
- }
-
- // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
- // unless we ask user to specify a name for each participant in the global TX in @Compensable
- private void updateCompensateStatus(TxEvent event) {
- commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
- log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
-
- if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
- && commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
- markGlobalTxEnd(event);
- log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
- }
- }
-
- private void markGlobalTxEnd(TxEvent event) {
- eventRepository.save(new TxEvent(
- event.serviceName(), event.instanceId(), new Date(), event.globalTxId(), event.globalTxId(),
- null, SagaEndedEvent.name(), "", EMPTY_PAYLOAD));
- }
-
private boolean isGlobalTxAborted(TxEvent event) {
return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
}
-
- private void scheduleCompensationCommandPolling(int commandPollingInterval) {
- scheduler.scheduleWithFixedDelay(
- () -> commandRepository.findFirstCommandToCompensate()
- .forEach(command -> {
- log.info("Compensating transaction with globalTxId {} and localTxId {}",
- command.globalTxId(),
- command.localTxId());
-
- omegaCallback.compensate(new TxEvent(
- command.serviceName(),
- command.instanceId(),
- command.globalTxId(),
- command.localTxId(),
- command.parentTxId(),
- TxStartedEvent.name(),
- command.compensationMethod(),
- command.payloads()
- ));
- }),
- 0,
- commandPollingInterval,
- MILLISECONDS);
- }
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
index cf5706b..d793de2 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -18,13 +18,14 @@
package org.apache.servicecomb.saga.alpha.core;
import java.util.List;
+import java.util.Optional;
public interface TxEventRepository {
void save(TxEvent event);
List<TxEvent> findTransactions(String globalTxId, String type);
- TxEvent findFirstTransaction(String globalTxId, String localTxId, String type);
+ List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type);
- List<TxEvent> findTransactionsToCompensate(String globalTxId);
+ Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type);
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 212c621..473501e 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -18,31 +18,23 @@
package org.apache.servicecomb.saga.alpha.core;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
+import static java.util.Collections.emptyList;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.Deque;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.servicecomb.saga.common.EventType;
@@ -64,93 +56,13 @@ public class TxConsistentServiceTest {
}
@Override
- public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
- return events.stream()
- .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId()) && type.equals(event.type()))
- .findFirst()
- .get();
- }
-
- @Override
- public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
- return events.stream()
- .filter(event -> globalTxId.equals(event.globalTxId())
- && event.type().equals(TxStartedEvent.name())
- && isCompleted(globalTxId, event)
- && !isCompensated(globalTxId, event))
- .collect(Collectors.toList());
- }
-
- private boolean isCompleted(String globalTxId, TxEvent event) {
- return events.stream()
- .filter(e -> globalTxId.equals(e.globalTxId())
- && e.localTxId().equals(event.localTxId())
- && e.type().equals(TxEndedEvent.name()))
- .count() > 0;
- }
-
- private boolean isCompensated(String globalTxId, TxEvent event) {
- return events.stream()
- .filter(e -> globalTxId.equals(e.globalTxId())
- && e.localTxId().equals(event.localTxId())
- && e.type().equals(TxCompensatedEvent.name()))
- .count() > 0;
- }
- };
-
- private final List<Command> commands = new ArrayList<>();
- private final CommandRepository commandRepository = new CommandRepository() {
- @Override
- public boolean exists(String globalTxId, String localTxId) {
- return commands.stream()
- .anyMatch(command -> globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId()));
- }
-
- @Override
- public void saveCompensationCommand(String globalTxId, String localTxId) {
- TxEvent event = eventRepository.findFirstTransaction(globalTxId, localTxId, TxStartedEvent.name());
- commands.add(new Command(event));
+ public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+ return emptyList();
}
@Override
- public void saveCompensationCommands(String globalTxId) {
- List<TxEvent> events = eventRepository.findTransactionsToCompensate(globalTxId);
-
- Map<String, Command> commandMap = new HashMap<>();
-
- for (TxEvent event : events) {
- commandMap.computeIfAbsent(event.localTxId(), k -> new Command(event));
- }
-
- commands.addAll(commandMap.values());
- }
-
- @Override
- public void markCommandAsDone(String globalTxId, String localTxId) {
- for (int i = 0; i < commands.size(); i++) {
- Command command = commands.get(i);
- if (globalTxId.equals(command.globalTxId()) && localTxId.equals(command.localTxId())) {
- commands.set(i, new Command(command, DONE));
- }
- }
- }
-
- @Override
- public List<Command> findUncompletedCommands(String globalTxId) {
- return commands.stream()
- .filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
- .collect(Collectors.toList());
- }
-
- @Override
- public List<Command> findFirstCommandToCompensate() {
- List<Command> results = new ArrayList<>(1);
- commands.stream()
- .filter(command -> !DONE.name().equals(command.status()))
- .findFirst()
- .ifPresent(results::add);
-
- return results;
+ public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+ return Optional.empty();
}
};
@@ -161,20 +73,9 @@ public class TxConsistentServiceTest {
private final String instanceId = uniquify("instanceId");
private final String compensationMethod = getClass().getCanonicalName();
- private final List<CompensationContext> compensationContexts = new ArrayList<>();
- private Consumer<TxEvent> eventConsumer = event -> {};
- private final OmegaCallback omegaCallback = event -> {
- eventConsumer.accept(event);
- compensationContexts.add(
- new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
- };
-
- private final TxConsistentService consistentService = new TxConsistentService(
- eventRepository,
- commandRepository,
- omegaCallback,
- 300);
+ private final TxConsistentService consistentService = new TxConsistentService(eventRepository);
+ private final byte[] payloads = "yeah".getBytes();
@Test
public void persistEventOnArrival() throws Exception {
@@ -190,49 +91,6 @@ public class TxConsistentServiceTest {
}
assertThat(this.events, contains(events));
- assertThat(compensationContexts.isEmpty(), is(true));
- }
-
- @Test
- public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
- eventConsumer = event -> consistentService
- .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
-
- String localTxId1 = UUID.randomUUID().toString();
- events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
- events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
-
- String localTxId2 = UUID.randomUUID().toString();
- events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2, "method b"));
- events.add(eventOf(TxEndedEvent, new byte[0], localTxId2, "method b"));
-
- TxEvent abortEvent = newEvent(TxAbortedEvent);
-
- consistentService.handle(abortEvent);
-
- await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
- assertThat(compensationContexts, containsInAnyOrder(
- new CompensationContext(globalTxId, localTxId1, "method a", "service a".getBytes()),
- new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
- ));
-
- await().atMost(1, SECONDS).until(() -> events.size() == 8);
- assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
- }
-
- @Test
- public void compensateTxEndedEventImmediately_IfGlobalTxAlreadyFailed() throws Exception {
- events.add(newEvent(TxStartedEvent));
- events.add(newEvent(TxAbortedEvent));
-
- TxEvent event = eventOf(TxEndedEvent, new byte[0], localTxId, compensationMethod);
-
- consistentService.handle(event);
-
- await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 0);
- assertThat(compensationContexts, containsInAnyOrder(
- new CompensationContext(globalTxId, localTxId, compensationMethod, "yeah".getBytes())
- ));
}
@Test
@@ -241,7 +99,7 @@ public class TxConsistentServiceTest {
events.add(newEvent(TxStartedEvent));
events.add(newEvent(TxAbortedEvent));
- TxEvent event = eventOf(TxStartedEvent, "service x".getBytes(), localTxId1, "method x");
+ TxEvent event = eventOf(TxStartedEvent, localTxId1);
consistentService.handle(event);
@@ -249,10 +107,10 @@ public class TxConsistentServiceTest {
}
private TxEvent newEvent(EventType eventType) {
- return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
+ return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads);
}
- private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId, String compensationMethod) {
+ private TxEvent eventOf(EventType eventType, String localTxId) {
return new TxEvent(serviceName, instanceId, new Date(),
globalTxId,
localTxId,
@@ -261,48 +119,4 @@ public class TxConsistentServiceTest {
compensationMethod,
payloads);
}
-
- private static class CompensationContext {
- private final String globalTxId;
- private final String localTxId;
- private final String compensationMethod;
- private final byte[] message;
-
- private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
- this.globalTxId = globalTxId;
- this.localTxId = localTxId;
- this.compensationMethod = compensationMethod;
- this.message = message;
- }
-
- @Override
- public String toString() {
- return "CompensationContext{" +
- "globalTxId='" + globalTxId + '\'' +
- ", localTxId='" + localTxId + '\'' +
- ", compensationMethod='" + compensationMethod + '\'' +
- ", message=" + Arrays.toString(message) +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CompensationContext that = (CompensationContext) o;
- return Objects.equals(globalTxId, that.globalTxId) &&
- Objects.equals(localTxId, that.localTxId) &&
- Objects.equals(compensationMethod, that.compensationMethod) &&
- Arrays.equals(message, that.message);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(globalTxId, localTxId, compensationMethod, message);
- }
- }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 3740581..73298be 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -28,6 +28,7 @@ import javax.annotation.PostConstruct;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.EventScanner;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
@@ -68,24 +69,27 @@ class AlphaConfig {
@Bean
ScheduledExecutorService compensationScheduler() {
- return Executors.newSingleThreadScheduledExecutor();
+ return Executors.newScheduledThreadPool(2);
}
@Bean
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
- @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
+ @Value("${alpha.command.pollingInterval:500}") int commandPollingInterval,
+ @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- TxConsistentService consistentService = new TxConsistentService(
+ new EventScanner(scheduler,
eventRepository,
commandRepository,
omegaCallback,
commandPollingInterval,
- scheduler);
+ eventPollingInterval).run();
+
+ TxConsistentService consistentService = new TxConsistentService(eventRepository);
ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 34b43a4..6076e54 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -20,8 +20,8 @@ package org.apache.servicecomb.saga.alpha.server;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW;
import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING;
-import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -29,9 +29,12 @@ import java.util.Map;
import org.apache.servicecomb.saga.alpha.core.Command;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.data.domain.PageRequest;
public class SpringCommandRepository implements CommandRepository {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
private final TxEventEnvelopeRepository eventRepository;
@@ -43,22 +46,7 @@ public class SpringCommandRepository implements CommandRepository {
}
@Override
- public boolean exists(String globalTxId, String localTxId) {
- return commandRepository.findByGlobalTxIdAndLocalTxId(globalTxId, localTxId).isPresent();
- }
-
- @Override
- public void saveCompensationCommand(String globalTxId, String localTxId) {
- TxEvent startedEvent = eventRepository.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(
- globalTxId,
- localTxId,
- TxStartedEvent.name());
-
- commandRepository.save(new Command(startedEvent));
- }
-
- @Override
- public void saveCompensationCommands(String globalTxId) {
+ public Iterable<Command> saveCompensationCommands(String globalTxId) {
List<TxEvent> events = eventRepository
.findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(globalTxId);
@@ -68,7 +56,8 @@ public class SpringCommandRepository implements CommandRepository {
commands.computeIfAbsent(event.localTxId(), k -> new Command(event));
}
- commandRepository.save(commands.values());
+ log.info("Saving compensation commands {}", commands.values());
+ return commandRepository.save(commands.values());
}
@Override
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7c44639..4108aa5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -18,9 +18,11 @@
package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
+import java.util.Optional;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.springframework.data.domain.PageRequest;
class SpringTxEventRepository implements TxEventRepository {
private final TxEventEnvelopeRepository eventRepo;
@@ -40,12 +42,12 @@ class SpringTxEventRepository implements TxEventRepository {
}
@Override
- public TxEvent findFirstTransaction(String globalTxId, String localTxId, String type) {
- return eventRepo.findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(globalTxId, localTxId, type);
+ public List<TxEvent> findFirstUncompensatedEventByIdGreaterThan(long id, String type) {
+ return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1));
}
@Override
- public List<TxEvent> findTransactionsToCompensate(String globalTxId) {
- return eventRepo.findStartedEventsWithMatchingEndedButNotCompensatedEvents(globalTxId);
+ public Optional<TxEvent> findFirstCompensatedEventByIdGreaterThan(long id, String type) {
+ return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 09aec6f..78c2a1d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -18,8 +18,10 @@
package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
+import java.util.Optional;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
@@ -32,13 +34,8 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ "WHERE t.globalTxId = ?1 AND t.type = ?2")
List<TxEvent> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
- TxEvent findFirstByGlobalTxIdAndLocalTxIdAndTypeOrderBySurrogateIdAsc(String globalTxId, String localTxId, String type);
-
- @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent("
- + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
- + ") FROM TxEvent t "
- + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent'"
- + "AND EXISTS ("
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+ " SELECT t1.globalTxId"
+ " FROM TxEvent t1 "
+ " WHERE t1.globalTxId = ?1 "
@@ -50,19 +47,15 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " WHERE t2.globalTxId = ?1 "
+ " AND t2.localTxId = t.localTxId "
+ " AND t2.type = 'TxCompensatedEvent') "
- + "ORDER BY t.surrogateId ASC"
- )
- List<TxEvent> findStartedEventsWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+ + "ORDER BY t.surrogateId ASC")
+ List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
- @Query("SELECT new org.apache.servicecomb.saga.alpha.core.TxEvent("
- + "t.surrogateId, t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads"
- + ") FROM TxEvent t "
- + "WHERE t.globalTxId = ?1 AND t.type = 'TxStartedEvent' AND EXISTS ( "
+ @Query("SELECT t FROM TxEvent t "
+ + "WHERE t.type = 'TxEndedEvent' AND t.surrogateId > ?2 AND EXISTS ( "
+ " SELECT t1.globalTxId"
+ " FROM TxEvent t1 "
- + " WHERE t1.globalTxId = ?1 "
- + " AND t1.localTxId = t.localTxId "
- + " AND t1.type = 'TxEndedEvent'"
+ + " WHERE t1.globalTxId = t.globalTxId "
+ + " AND t1.type = 'TxAbortedEvent'"
+ ") AND NOT EXISTS ( "
+ " SELECT t2.globalTxId"
+ " FROM TxEvent t2 "
@@ -70,5 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " AND t2.localTxId = t.localTxId "
+ " AND t2.type = 'TxCompensatedEvent') "
+ "ORDER BY t.surrogateId ASC")
- List<TxEvent> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
+ List<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId, Pageable pageable);
+
+ Optional<TxEvent> findFirstByTypeAndSurrogateIdGreaterThan(String type, long surrogateId);
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1bfbe3c..5cff57c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -34,8 +34,6 @@ import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -57,8 +55,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.test.context.junit4.SpringRunner;
import com.google.protobuf.ByteString;
@@ -69,7 +65,7 @@ import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
- properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1"})
+ properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=1", "alpha.event.pollingInterval=100"})
public class AlphaIntegrationTest {
private static final int port = 8090;
@@ -417,10 +413,4 @@ public class AlphaIntegrationTest {
return completed;
}
}
-
- @Primary
- @Bean
- ScheduledExecutorService scheduler() {
- return Executors.newScheduledThreadPool(2);
- }
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.