You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/29 06:41:00 UTC
[incubator-servicecomb-saga] 04/05: SCB-98 included compensation
method signature in omega callback
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 9640570a0f301f81e013f522f7e14ea3f5282bfb
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 09:00:01 2017 +0800
SCB-98 included compensation method signature in omega callback
Signed-off-by: seanyinx <se...@huawei.com>
---
.../servicecomb/saga/alpha/core/OmegaCallback.java | 2 +-
.../saga/alpha/core/TxConsistentService.java | 2 +-
.../io/servicecomb/saga/alpha/core/TxEvent.java | 14 +++-
.../saga/alpha/core/TxConsistentServiceTest.java | 90 +++++++++++++++-----
.../servicecomb/saga/alpha/server/AlphaConfig.java | 3 +-
.../alpha/server/SwiftTxEventEndpointImpl.java | 1 +
.../saga/alpha/server/TxEventEnvelope.java | 13 ++-
.../alpha/server/TxEventEnvelopeRepository.java | 2 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 95 +++++++++++++++-------
9 files changed, 162 insertions(+), 60 deletions(-)
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 7302016..5ebfb72 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,5 +18,5 @@
package io.servicecomb.saga.alpha.core;
public interface OmegaCallback {
- void compensate(String globalTxId, byte[] message);
+ void compensate(String globalTxId, String localTxId, String compensationMethod, byte[] message);
}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 22605f8..15f5099 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -49,6 +49,6 @@ public class TxConsistentService {
// TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
private void compensate(TxEvent event) {
List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
- events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
+ events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), event.compensationMethod(), evt.payloads()));
}
}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index 2d0a19b..da46db8 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -25,17 +25,25 @@ public class TxEvent {
private String localTxId;
private String parentTxId;
private String type;
+ private String compensationMethod;
private byte[] payloads;
private TxEvent() {
}
- public TxEvent(Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) {
+ public TxEvent(Date creationTime,
+ String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ byte[] payloads) {
this.creationTime = creationTime;
this.globalTxId = globalTxId;
this.localTxId = localTxId;
this.parentTxId = parentTxId;
this.type = type;
+ this.compensationMethod = compensationMethod;
this.payloads = payloads;
}
@@ -59,6 +67,10 @@ public class TxEvent {
return type;
}
+ public String compensationMethod() {
+ return compensationMethod;
+ }
+
public byte[] payloads() {
return payloads;
}
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index d7e66c3..13a2674 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -23,7 +23,6 @@ import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.contains;
@@ -32,10 +31,10 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -61,8 +60,12 @@ public class TxConsistentServiceTest {
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
- private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
- private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ private final String compensationMethod = getClass().getCanonicalName();
+ private final List<CompensationContext> compensationContexts = new ArrayList<>();
+
+ private final OmegaCallback omegaCallback = (globalTxId, localTxId, compensationMethod, payloads) ->
+ compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
+
private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
@Test
@@ -79,40 +82,85 @@ public class TxConsistentServiceTest {
}
assertThat(this.events, contains(events));
- assertThat(callbackArgs.isEmpty(), is(true));
+ assertThat(compensationContexts.isEmpty(), is(true));
}
@Test
public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
- events.add(eventOf(TxStartedEvent, "service a".getBytes()));
- events.add(eventOf(TxEndedEvent, new byte[0]));
- events.add(eventOf(TxStartedEvent, "service b".getBytes()));
- events.add(eventOf(TxEndedEvent, new byte[0]));
+ String localTxId1 = UUID.randomUUID().toString();
+ events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId1));
+
+ String localTxId2 = UUID.randomUUID().toString();
+ events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2));
+ events.add(eventOf(TxEndedEvent, new byte[0], localTxId2));
TxEvent abortEvent = newEvent(TxAbortedEvent);
consistentService.handle(abortEvent);
- await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
- assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
- }
-
- private List<String> stringOf(List<byte[]> bytes) {
- return bytes.stream()
- .map(String::new)
- .collect(Collectors.toList());
+ await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+ assertThat(compensationContexts, containsInAnyOrder(
+ new CompensationContext(globalTxId, localTxId1, compensationMethod, "service a".getBytes()),
+ new CompensationContext(globalTxId, localTxId2, compensationMethod, "service b".getBytes())
+ ));
}
private TxEvent newEvent(EventType eventType) {
- return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes());
+ return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
}
- private TxEvent eventOf(EventType eventType, byte[] payloads) {
+ private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
return new TxEvent(new Date(),
globalTxId,
- UUID.randomUUID().toString(),
+ localTxId,
UUID.randomUUID().toString(),
eventType.name(),
+ compensationMethod,
payloads);
}
+
+ private static class CompensationContext {
+ private final String globalTxId;
+ private final String localTxId;
+ private final String compensationMethod;
+ private final byte[] message;
+
+ private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.compensationMethod = compensationMethod;
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "CompensationContext{" +
+ "globalTxId='" + globalTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ ", compensationMethod='" + compensationMethod + '\'' +
+ ", message=" + Arrays.toString(message) +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompensationContext that = (CompensationContext) o;
+ return Objects.equals(globalTxId, that.globalTxId) &&
+ Objects.equals(localTxId, that.localTxId) &&
+ Objects.equals(compensationMethod, that.compensationMethod) &&
+ Arrays.equals(message, that.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+ }
+ }
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 94b024e..d443fa7 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -33,8 +33,7 @@ class AlphaConfig {
@Bean
OmegaCallback omegaCallback() {
// TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
- return (globalTxId, message) -> {
- };
+ return (globalTxId, localTxId, compensationMethod, message) -> {};
}
@Bean
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 9ce7c80..f1f8e40 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -40,6 +40,7 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
message.localTxId(),
message.parentTxId(),
message.type(),
+ message.compensationMethod(),
message.payloads()
));
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index b027754..fa282b4 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,8 +42,13 @@ class TxEventEnvelope {
this.event = event;
}
- public TxEventEnvelope(String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) {
- this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, payloads);
+ public TxEventEnvelope(String globalTxId,
+ String localTxId,
+ String parentTxId,
+ String type,
+ String compensationMethod,
+ byte[] payloads) {
+ this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
}
public long creationTime() {
@@ -66,6 +71,10 @@ class TxEventEnvelope {
return event.type();
}
+ String compensationMethod() {
+ return event.compensationMethod();
+ }
+
byte[] payloads() {
return event.payloads();
}
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 04ff836..5f929a1 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long
TxEventEnvelope findByEventGlobalTxId(String globalTxId);
@Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
- + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.payloads"
+ + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
+ ") FROM TxEventEnvelope t "
+ "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index d932072..f7ec33c 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -21,7 +21,6 @@ import static com.google.common.net.HostAndPort.fromParts;
import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,12 +28,11 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
-import java.util.stream.Collectors;
import org.junit.After;
import org.junit.AfterClass;
@@ -68,12 +66,13 @@ public class AlphaIntegrationTest {
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
+ private final String compensationMethod = getClass().getCanonicalName();
@Autowired
private TxEventEnvelopeRepository eventRepo;
@Autowired
- private Map<String, List<byte[]>> callbackArgs;
+ private List<CompensationContext> compensationContexts;
private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
private SwiftTxEventEndpoint endpoint;
@@ -103,40 +102,28 @@ public class AlphaIntegrationTest {
assertThat(envelope.localTxId(), is(localTxId));
assertThat(envelope.parentTxId(), is(parentTxId));
assertThat(envelope.type(), is(TxStartedEvent.name()));
+ assertThat(envelope.compensationMethod(), is(compensationMethod));
assertThat(envelope.payloads(), is(payload.getBytes()));
}
@Test
- public void compensateOnFailure() throws Exception {
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
- eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-
- endpoint.handle(someEvent(TxAbortedEvent));
-
- await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
- assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
- }
-
- @Test
public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+ // duplicate events with same content but different timestamp
eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+
+ String localTxId1 = UUID.randomUUID().toString();
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes()));
eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
endpoint.handle(someEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
- assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
- }
-
- private List<String> stringOf(List<byte[]> bytes) {
- return bytes.stream()
- .map(String::new)
- .collect(Collectors.toList());
+ await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+ assertThat(compensationContexts, containsInAnyOrder(
+ new CompensationContext(globalTxId, this.localTxId, compensationMethod, "service a".getBytes()),
+ new CompensationContext(globalTxId, localTxId1, compensationMethod, "service b".getBytes())
+ ));
}
private SwiftTxEvent someEvent(EventType type) {
@@ -146,6 +133,7 @@ public class AlphaIntegrationTest {
this.localTxId,
this.parentTxId,
type.name(),
+ compensationMethod,
payload.getBytes());
}
@@ -159,22 +147,67 @@ public class AlphaIntegrationTest {
localTxId,
parentTxId,
eventType.name(),
+ compensationMethod,
payloads));
}
@Configuration
static class OmegaCallbackConfig {
- private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+ private final List<CompensationContext> compensationContexts = new ArrayList<>();
@Bean
- Map<String, List<byte[]>> callbackArgs() {
- return callbackArgs;
+ List<CompensationContext> compensationContexts() {
+ return compensationContexts;
}
@Bean
OmegaCallback omegaCallback() {
- return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ return (globalTxId, localTxId, compensationMethod, payloads) ->
+ compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
}
}
+ private static class CompensationContext {
+ private final String globalTxId;
+ private final String localTxId;
+ private final String compensationMethod;
+ private final byte[] message;
+
+ private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.compensationMethod = compensationMethod;
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "CompensationContext{" +
+ "globalTxId='" + globalTxId + '\'' +
+ ", localTxId='" + localTxId + '\'' +
+ ", compensationMethod='" + compensationMethod + '\'' +
+ ", message=" + Arrays.toString(message) +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompensationContext that = (CompensationContext) o;
+ return Objects.equals(globalTxId, that.globalTxId) &&
+ Objects.equals(localTxId, that.localTxId) &&
+ Objects.equals(compensationMethod, that.compensationMethod) &&
+ Arrays.equals(message, that.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.