You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/19 09:04:28 UTC
[incubator-servicecomb-saga] 02/09: SCB-218 polling in background
for events to compensate
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-218_alpha_stateless
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 994e1c3bac9c432433a80dfd2f9cc77bf528b742
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 16 18:43:31 2018 +0800
SCB-218 polling in background for events to compensate
Signed-off-by: seanyinx <se...@huawei.com>
---
alpha/alpha-core/pom.xml | 15 +++++++
.../servicecomb/saga/alpha/core/Command.java | 20 +++++++++
.../saga/alpha/core/CommandRepository.java | 2 +
.../saga/alpha/core/TxConsistentService.java | 51 +++++++++++++++++-----
.../saga/alpha/core/TxConsistentServiceTest.java | 35 +++++++++++----
.../servicecomb/saga/alpha/server/AlphaConfig.java | 7 ++-
.../saga/alpha/server/CommandEntity.java | 6 ++-
.../saga/alpha/server/CommandEntityRepository.java | 14 +++++-
.../saga/alpha/server/SpringCommandRepository.java | 20 ++++++++-
.../alpha/server/TxEventEnvelopeRepository.java | 3 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 39 +++++++++++++----
11 files changed, 174 insertions(+), 38 deletions(-)
diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index aa74718..107fff4 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -44,6 +44,21 @@
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
index 08f8527..c852c16 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java
@@ -83,6 +83,14 @@ public class Command {
event.payloads());
}
+ String serviceName() {
+ return serviceName;
+ }
+
+ String instanceId() {
+ return instanceId;
+ }
+
String globalTxId() {
return globalTxId;
}
@@ -91,6 +99,18 @@ public class Command {
return localTxId;
}
+ String parentTxId() {
+ return parentTxId;
+ }
+
+ String compensationMethod() {
+ return compensationMethod;
+ }
+
+ byte[] payloads() {
+ return payloads;
+ }
+
String status() {
return status;
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
index 915d476..22f2b41 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandRepository.java
@@ -29,4 +29,6 @@ public interface CommandRepository {
void markCommandAsDone(String globalTxId, String localTxId);
List<Command> findUncompletedCommands(String globalTxId);
+
+ List<Command> findFirstCommandToCompensate();
}
diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 560096f..8f88735 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -17,21 +17,27 @@
package org.apache.servicecomb.saga.alpha.core;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
+import java.lang.invoke.MethodHandles;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class TxConsistentService {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -46,13 +52,17 @@ public class TxConsistentService {
}};
private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public TxConsistentService(TxEventRepository eventRepository,
CommandRepository commandRepository,
- OmegaCallback omegaCallback) {
+ OmegaCallback omegaCallback,
+ int commandPollingInterval) {
this.eventRepository = eventRepository;
this.commandRepository = commandRepository;
this.omegaCallback = omegaCallback;
+
+ scheduleCompensationCommandPolling(commandPollingInterval);
}
public boolean handle(TxEvent event) {
@@ -69,10 +79,6 @@ public class TxConsistentService {
private void compensateIfAlreadyAborted(TxEvent event) {
if (!isCompensationScheduled(event) && isGlobalTxAborted(event)) {
commandRepository.saveCompensationCommand(event.globalTxId(), event.localTxId());
- TxEvent correspondingStartedEvent = eventRepository
- .findFirstTransaction(event.globalTxId(), event.localTxId(), TxStartedEvent.name());
-
- omegaCallback.compensate(correspondingStartedEvent);
}
}
@@ -81,22 +87,19 @@ public class TxConsistentService {
}
private void compensate(TxEvent event) {
- List<TxEvent> events = eventRepository.findTransactionsToCompensate(event.globalTxId());
-
- events.removeIf(this::isCompensationScheduled);
-
commandRepository.saveCompensationCommands(event.globalTxId());
-
- events.forEach(omegaCallback::compensate);
}
// TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late,
// unless we ask user to specify a name for each participant in the global TX in @Compensable
private void updateCompensateStatus(TxEvent event) {
commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId());
+ log.info("Transaction with globalTxId {} and localTxId {} was compensated", event.globalTxId(), event.localTxId());
+
if (eventRepository.findTransactions(event.globalTxId(), SagaEndedEvent.name()).isEmpty()
&& commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
markGlobalTxEnd(event);
+ log.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}
}
@@ -109,4 +112,28 @@ public class TxConsistentService {
private boolean isGlobalTxAborted(TxEvent event) {
return !eventRepository.findTransactions(event.globalTxId(), TxAbortedEvent.name()).isEmpty();
}
+
+ private void scheduleCompensationCommandPolling(int commandPollingInterval) {
+ scheduler.scheduleWithFixedDelay(
+ () -> commandRepository.findFirstCommandToCompensate()
+ .forEach(command -> {
+ log.info("Compensating transaction with globalTxId {} and localTxId {}",
+ command.globalTxId(),
+ command.localTxId());
+
+ omegaCallback.compensate(new TxEvent(
+ command.serviceName(),
+ command.instanceId(),
+ command.globalTxId(),
+ command.localTxId(),
+ command.parentTxId(),
+ TxStartedEvent.name(),
+ command.compensationMethod(),
+ command.payloads()
+ ));
+ }),
+ 0,
+ commandPollingInterval,
+ MILLISECONDS);
+ }
}
diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 8ae60a3..212c621 100644
--- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.servicecomb.saga.common.EventType;
@@ -140,6 +141,17 @@ public class TxConsistentServiceTest {
.filter(command -> command.globalTxId().equals(globalTxId) && !DONE.name().equals(command.status()))
.collect(Collectors.toList());
}
+
+ @Override
+ public List<Command> findFirstCommandToCompensate() {
+ List<Command> results = new ArrayList<>(1);
+ commands.stream()
+ .filter(command -> !DONE.name().equals(command.status()))
+ .findFirst()
+ .ifPresent(results::add);
+
+ return results;
+ }
};
private final String globalTxId = UUID.randomUUID().toString();
@@ -151,10 +163,18 @@ public class TxConsistentServiceTest {
private final String compensationMethod = getClass().getCanonicalName();
private final List<CompensationContext> compensationContexts = new ArrayList<>();
- private final OmegaCallback omegaCallback = event ->
- compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+ private Consumer<TxEvent> eventConsumer = event -> {};
+ private final OmegaCallback omegaCallback = event -> {
+ eventConsumer.accept(event);
+ compensationContexts.add(
+ new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
+ };
- private final TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+ private final TxConsistentService consistentService = new TxConsistentService(
+ eventRepository,
+ commandRepository,
+ omegaCallback,
+ 300);
@Test
public void persistEventOnArrival() throws Exception {
@@ -175,6 +195,9 @@ public class TxConsistentServiceTest {
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+ eventConsumer = event -> consistentService
+ .handle(eventOf(TxCompensatedEvent, new byte[0], event.localTxId(), event.compensationMethod()));
+
String localTxId1 = UUID.randomUUID().toString();
events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1, "method a"));
events.add(eventOf(TxEndedEvent, new byte[0], localTxId1, "method a"));
@@ -193,12 +216,6 @@ public class TxConsistentServiceTest {
new CompensationContext(globalTxId, localTxId2, "method b", "service b".getBytes())
));
- TxEvent compensateEvent2 = eventOf(TxCompensatedEvent, "service b".getBytes(), localTxId2, "method b");
- consistentService.handle(compensateEvent2);
-
- TxEvent compensateEvent1 = eventOf(TxCompensatedEvent, "service a".getBytes(), localTxId1, "method a");
- consistentService.handle(compensateEvent1);
-
await().atMost(1, SECONDS).until(() -> events.size() == 8);
assertThat(events.pollLast().type(), is(SagaEndedEvent.name()));
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 00dfe27..9c34738 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -66,12 +66,17 @@ class AlphaConfig {
@Bean
TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
+ @Value("${alpha.command.pollingInterval:3000}") int commandPollingInterval,
TxEventRepository eventRepository,
CommandRepository commandRepository,
OmegaCallback omegaCallback,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- TxConsistentService consistentService = new TxConsistentService(eventRepository, commandRepository, omegaCallback);
+ TxConsistentService consistentService = new TxConsistentService(
+ eventRepository,
+ commandRepository,
+ omegaCallback,
+ commandPollingInterval);
ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
new Thread(startable::start).start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
index 3eac681..6207002 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntity.java
@@ -21,7 +21,6 @@ import java.util.Date;
import javax.persistence.Embedded;
import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Version;
@@ -31,7 +30,6 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent;
@Entity
class CommandEntity {
@Id
- @GeneratedValue
private long surrogateId;
@Embedded
@@ -50,4 +48,8 @@ class CommandEntity {
lastModified = new Date();
command = new Command(event);
}
+
+ Command command() {
+ return command;
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
index 4b7309e..9402486 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/CommandEntityRepository.java
@@ -20,7 +20,9 @@ package org.apache.servicecomb.saga.alpha.server;
import java.util.List;
import java.util.Optional;
-import org.apache.servicecomb.saga.alpha.core.Command;
+import javax.transaction.Transactional;
+
+import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
@@ -29,6 +31,7 @@ import org.springframework.data.repository.query.Param;
public interface CommandEntityRepository extends CrudRepository<CommandEntity, Long> {
Optional<CommandEntity> findByCommandGlobalTxIdAndCommandLocalTxId(String globalTxId, String localTxId);
+ @Transactional
@Modifying
@Query("UPDATE org.apache.servicecomb.saga.alpha.server.CommandEntity c "
+ "SET c.command.status = :status "
@@ -39,5 +42,12 @@ public interface CommandEntityRepository extends CrudRepository<CommandEntity, L
@Param("globalTxId") String globalTxId,
@Param("localTxId") String localTxId);
- List<Command> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+ List<CommandEntity> findByCommandGlobalTxIdAndCommandStatus(String globalTxId, String status);
+
+ @Query("FROM CommandEntity c "
+ + "WHERE id IN ("
+ + " SELECT MAX(id) FROM CommandEntity c1 WHERE c1.command.status <> 'DONE' GROUP BY c1.command.globalTxId"
+ + ") "
+ + "ORDER BY c.id ASC")
+ List<CommandEntity> findFirstGroupByCommandGlobalTxIdOrderByIdDesc(Pageable pageable);
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
index 9281b7e..aac3c22 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java
@@ -24,11 +24,15 @@ import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.servicecomb.saga.alpha.core.Command;
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
+import org.springframework.data.domain.PageRequest;
public class SpringCommandRepository implements CommandRepository {
+ private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1);
+
private final TxEventEnvelopeRepository eventRepository;
private final CommandEntityRepository commandRepository;
@@ -44,7 +48,7 @@ public class SpringCommandRepository implements CommandRepository {
@Override
public void saveCompensationCommand(String globalTxId, String localTxId) {
- TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventType(
+ TxEventEnvelope startedEvent = eventRepository.findFirstByEventGlobalTxIdAndEventLocalTxIdAndEventTypeOrderByIdAsc(
globalTxId,
localTxId,
TxStartedEvent.name());
@@ -73,6 +77,18 @@ public class SpringCommandRepository implements CommandRepository {
@Override
public List<Command> findUncompletedCommands(String globalTxId) {
- return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name());
+ return commandRepository.findByCommandGlobalTxIdAndCommandStatus(globalTxId, NEW.name())
+ .stream()
+ .map(CommandEntity::command)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<Command> findFirstCommandToCompensate() {
+ return commandRepository
+ .findFirstGroupByCommandGlobalTxIdOrderByIdDesc(SINGLE_COMMAND_REQUEST)
+ .stream()
+ .map(CommandEntity::command)
+ .collect(Collectors.toList());
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index fcb7c00..85ed954 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -63,6 +63,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " FROM TxEventEnvelope t2 "
+ " WHERE t2.event.globalTxId = ?1 "
+ " AND t2.event.localTxId = t.event.localTxId "
- + " AND t2.event.type = 'TxCompensatedEvent')")
+ + " AND t2.event.type = 'TxCompensatedEvent')"
+ + "ORDER BY t.id ASC ")
List<TxEventEnvelope> findStartedEventEnvelopesWithMatchingEndedButNotCompensatedEvents(String globalTxId);
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 6d4f91f..1514869 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -24,7 +24,7 @@ import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -34,11 +34,12 @@ import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
-import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -63,7 +64,8 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
+ properties = {"alpha.server.port=8090", "alpha.command.pollingInterval=300"})
public class AlphaIntegrationTest {
private static final int port = 8090;
@@ -97,7 +99,7 @@ public class AlphaIntegrationTest {
private TxConsistentService consistentService;
private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
- private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
+ private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver(this::onCompensation);
@AfterClass
public static void tearDown() throws Exception {
@@ -222,11 +224,11 @@ public class AlphaIntegrationTest {
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
- assertThat(receivedCommands, containsInAnyOrder(
- GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
- .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+ assertThat(receivedCommands, contains(
GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
- .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
+ .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build(),
+ GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+ .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build()
));
}
@@ -305,7 +307,7 @@ public class AlphaIntegrationTest {
String anotherLocalTxId2 = UUID.randomUUID().toString();
blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId2));
- await().atMost(1, SECONDS).until(() -> eventRepo.count() == 6);
+ await().atMost(1, SECONDS).until(() -> eventRepo.count() == 7);
blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
@@ -314,6 +316,15 @@ public class AlphaIntegrationTest {
assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
}
+ private GrpcAck onCompensation(GrpcCompensateCommand command) {
+ return blockingStub.onTxEvent(
+ eventOf(TxCompensatedEvent,
+ command.getLocalTxId(),
+ command.getParentTxId(),
+ new byte[0],
+ command.getCompensateMethod()));
+ }
+
private GrpcServiceConfig someServiceConfig() {
return GrpcServiceConfig.newBuilder()
.setServiceName(uniquify("serviceName"))
@@ -371,11 +382,21 @@ public class AlphaIntegrationTest {
}
private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+ private final Consumer<GrpcCompensateCommand> consumer;
private boolean completed = false;
+ private CompensateStreamObserver() {
+ this(command -> {});
+ }
+
+ private CompensateStreamObserver(Consumer<GrpcCompensateCommand> consumer) {
+ this.consumer = consumer;
+ }
+
@Override
public void onNext(GrpcCompensateCommand command) {
// intercept received command
+ consumer.accept(command);
receivedCommands.add(command);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.