You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2017/12/27 08:41:52 UTC
[incubator-servicecomb-saga] 01/01: SCB-99 added compensation logic
on tx aborted event
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-99_compensation_logic
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 3257c07913d9a2c16db8b94ac4816714f141a2ef
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Dec 27 16:37:46 2017 +0800
SCB-99 added compensation logic on tx aborted event
Signed-off-by: seanyinx <se...@huawei.com>
---
alpha/alpha-core/pom.xml | 18 ++++
.../{TxEventRepository.java => EventType.java} | 9 +-
.../{TxEventRepository.java => OmegaCallback.java} | 4 +-
.../saga/alpha/core/TxConsistentService.java | 55 ++++++++++
.../saga/alpha/core/TxEventRepository.java | 4 +
.../saga/alpha/core/TxConsistentServiceTest.java | 118 +++++++++++++++++++++
alpha/alpha-server/pom.xml | 4 +
.../servicecomb/saga/alpha/server/AlphaConfig.java | 22 +++-
.../saga/alpha/server/SpringTxEventRepository.java | 11 ++
.../alpha/server/SwiftTxEventEndpointImpl.java | 10 +-
.../saga/alpha/server/TxEventEnvelope.java | 4 +
.../alpha/server/TxEventEnvelopeRepository.java | 4 +
.../saga/alpha/server/AlphaIntegrationTest.java | 107 ++++++++++++++++---
13 files changed, 345 insertions(+), 25 deletions(-)
diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index a2cb3e2..92f22f6 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -27,6 +27,24 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>alpha-core</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.seanyinx</groupId>
+ <artifactId>unit-scaffolding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
similarity index 86%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
index 08ce832..dee34a6 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
@@ -17,6 +17,11 @@
package io.servicecomb.saga.alpha.core;
-public interface TxEventRepository {
- void save(TxEvent event);
+public enum EventType {
+ SagaStartedEvent,
+ TxStartedEvent,
+ TxEndedEvent,
+ TxAbortedEvent,
+ TxCompensatedEvent,
+ SagaEndedEvent
}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
similarity index 90%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 08ce832..7302016 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -17,6 +17,6 @@
package io.servicecomb.saga.alpha.core;
-public interface TxEventRepository {
- void save(TxEvent event);
+public interface OmegaCallback {
+ void compensate(String globalTxId, byte[] message);
}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
new file mode 100644
index 0000000..af02d74
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+public class TxConsistentService {
+ private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
+
+ private final TxEventRepository eventRepository;
+ private final OmegaCallback omegaCallback;
+ private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+ put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+ put(TxAbortedEvent.name(), (event) -> compensate(event));
+ }};
+
+ public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+ this.eventRepository = eventRepository;
+ this.omegaCallback = omegaCallback;
+ }
+
+ public void handle(TxEvent event) {
+ eventRepository.save(event);
+ CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+ }
+
+ // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
+ private void compensate(TxEvent event) {
+ List<TxEvent> events = eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+ events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
+ }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index 08ce832..cb44f77 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -17,6 +17,10 @@
package io.servicecomb.saga.alpha.core;
+import java.util.List;
+
public interface TxEventRepository {
void save(TxEvent event);
+
+ List<TxEvent> findCompletedEvents(String globalTxId, String type);
}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
new file mode 100644
index 0000000..64ed62c
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+public class TxConsistentServiceTest {
+ private final List<TxEvent> events = new ArrayList<>();
+ private final TxEventRepository eventRepository = new TxEventRepository() {
+ @Override
+ public void save(TxEvent event) {
+ events.add(event);
+ }
+
+ @Override
+ public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ return events.stream()
+ .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
+ .collect(Collectors.toList());
+ }
+ };
+
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+
+ private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+ private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+
+ @Test
+ public void persistEventOnArrival() throws Exception {
+ TxEvent[] events = {
+ newEvent(SagaStartedEvent),
+ newEvent(TxStartedEvent),
+ newEvent(TxEndedEvent),
+ newEvent(TxCompensatedEvent),
+ newEvent(SagaEndedEvent)};
+
+ for (TxEvent event : events) {
+ consistentService.handle(event);
+ }
+
+ assertThat(this.events, contains(events));
+ assertThat(callbackArgs.isEmpty(), is(true));
+ }
+
+ @Test
+ public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+ events.add(eventOf(TxStartedEvent, "service a".getBytes()));
+ events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+ events.add(eventOf(TxStartedEvent, "service b".getBytes()));
+ events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+
+ TxEvent abortEvent = newEvent(TxAbortedEvent);
+
+ consistentService.handle(abortEvent);
+
+ await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+ assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+ }
+
+ private List<String> stringOf(List<byte[]> bytes) {
+ return bytes.stream()
+ .map(String::new)
+ .collect(Collectors.toList());
+ }
+
+ private TxEvent newEvent(EventType eventType) {
+ return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes());
+ }
+
+ private TxEvent eventOf(EventType eventType, byte[] payloads) {
+ return new TxEvent(new Date(),
+ globalTxId,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ eventType.name(),
+ payloads);
+ }
+}
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 4bc347e..774883d 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -80,6 +80,10 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 50d4a8d..94b024e 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,16 +23,34 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxConsistentService;
import io.servicecomb.saga.alpha.core.TxEventRepository;
@Configuration
class AlphaConfig {
@Bean
- TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port, TxEventEnvelopeRepository eventRepo) {
+ OmegaCallback omegaCallback() {
+ // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
+ return (globalTxId, message) -> {
+ };
+ }
+
+ @Bean
+ TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port,
+ TxEventEnvelopeRepository eventRepo,
+ OmegaCallback omegaCallback) {
+
TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo);
- ThriftStartable startable = new ThriftStartable(port, new SwiftTxEventEndpointImpl(eventRepository));
+ ThriftStartable startable = new ThriftStartable(
+ port,
+ new SwiftTxEventEndpointImpl(
+ new TxConsistentService(
+ eventRepository,
+ omegaCallback)));
+
CompletableFuture.runAsync(startable::start);
return eventRepository;
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7925924..7b59d29 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,6 +17,9 @@
package io.servicecomb.saga.alpha.server;
+import java.util.List;
+import java.util.stream.Collectors;
+
import io.servicecomb.saga.alpha.core.TxEvent;
import io.servicecomb.saga.alpha.core.TxEventRepository;
@@ -31,4 +34,12 @@ class SpringTxEventRepository implements TxEventRepository {
public void save(TxEvent event) {
eventRepo.save(new TxEventEnvelope(event));
}
+
+ @Override
+ public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+ return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
+ .stream()
+ .map(TxEventEnvelope::event)
+ .collect(Collectors.toList());
+ }
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 78b93b4..9ce7c80 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -19,22 +19,22 @@ package io.servicecomb.saga.alpha.server;
import java.util.Date;
+import io.servicecomb.saga.alpha.core.TxConsistentService;
import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.core.TxEventRepository;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
- private final TxEventRepository eventRepository;
+ private final TxConsistentService txConsistentService;
- SwiftTxEventEndpointImpl(TxEventRepository eventRepository) {
- this.eventRepository = eventRepository;
+ SwiftTxEventEndpointImpl(TxConsistentService txConsistentService) {
+ this.txConsistentService = txConsistentService;
}
@Override
public void handle(SwiftTxEvent message) {
- eventRepository.save(new TxEvent(
+ txConsistentService.handle(new TxEvent(
new Date(message.timestamp()),
message.globalTxId(),
message.localTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index adbb28e..152edfb 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -63,4 +63,8 @@ class TxEventEnvelope {
byte[] payloads() {
return event.payloads();
}
+
+ TxEvent event() {
+ return event;
+ }
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3e46de3..cd3cbc7 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -17,8 +17,12 @@
package io.servicecomb.saga.alpha.server;
+import java.util.List;
+
import org.springframework.data.repository.CrudRepository;
interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+
+ List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
}
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index acce006..e9c9a98 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -18,29 +18,49 @@
package io.servicecomb.saga.alpha.server;
import static com.google.common.net.HostAndPort.fromParts;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import com.facebook.nifty.client.FramedClientConnector;
import com.facebook.swift.service.ThriftClientManager;
+import io.servicecomb.saga.alpha.core.EventType;
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxEvent;
+import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
@RunWith(SpringRunner.class)
-@SpringBootTest(classes = AlphaApplication.class, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
public class AlphaIntegrationTest {
private static final ThriftClientManager clientManager = new ThriftClientManager();
- private static final String TX_STARTED_EVENT = "TxStartedEvent";
private static final String payload = "hello world";
private final int port = 8090;
@@ -52,32 +72,91 @@ public class AlphaIntegrationTest {
@Autowired
private TxEventEnvelopeRepository eventRepo;
+ @Autowired
+ private Map<String, List<byte[]>> callbackArgs;
+
+ private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
+ private SwiftTxEventEndpoint endpoint;
+
@AfterClass
public static void tearDown() throws Exception {
clientManager.close();
}
+ @Before
+ public void setUp() throws Exception {
+ endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
+ }
+
+ @After
+ public void after() throws Exception {
+ endpoint.close();
+ }
+
@Test
public void persistsEvent() throws Exception {
- FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
- SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
-
- endpoint.handle(new SwiftTxEvent(
- System.currentTimeMillis(),
- globalTxId,
- localTxId,
- parentTxId,
- TX_STARTED_EVENT,
- payload.getBytes()));
+ endpoint.handle(someEvent(TxStartedEvent));
TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
assertThat(envelope.globalTxId(), is(globalTxId));
assertThat(envelope.localTxId(), is(localTxId));
assertThat(envelope.parentTxId(), is(parentTxId));
- assertThat(envelope.type(), is(TX_STARTED_EVENT));
+ assertThat(envelope.type(), is(TxStartedEvent.name()));
assertThat(envelope.payloads(), is(payload.getBytes()));
+ }
- endpoint.close();
+ @Test
+ public void compensateOnFailure() throws Exception {
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+ eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+
+ endpoint.handle(someEvent(TxAbortedEvent));
+
+ await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+ assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+ }
+
+ private List<String> stringOf(List<byte[]> bytes) {
+ return bytes.stream()
+ .map(String::new)
+ .collect(Collectors.toList());
+ }
+
+ private SwiftTxEvent someEvent(EventType type) {
+ return new SwiftTxEvent(
+ System.currentTimeMillis(),
+ this.globalTxId,
+ this.localTxId,
+ this.parentTxId,
+ type.name(),
+ payload.getBytes());
+ }
+
+ private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) {
+ return new TxEventEnvelope(new TxEvent(new Date(),
+ globalTxId,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ eventType.name(),
+ payloads));
}
+
+ @Configuration
+ static class OmegaCallbackConfig {
+ private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+
+ @Bean
+ Map<String, List<byte[]>> callbackArgs() {
+ return callbackArgs;
+ }
+
+ @Bean
+ OmegaCallback omegaCallback() {
+ return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.