You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2017/12/29 06:41:00 UTC

[incubator-servicecomb-saga] 04/05: SCB-98 included compensation method signature in omega callback

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9640570a0f301f81e013f522f7e14ea3f5282bfb
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 09:00:01 2017 +0800

    SCB-98 included compensation method signature in omega callback
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  2 +-
 .../saga/alpha/core/TxConsistentService.java       |  2 +-
 .../io/servicecomb/saga/alpha/core/TxEvent.java    | 14 +++-
 .../saga/alpha/core/TxConsistentServiceTest.java   | 90 +++++++++++++++-----
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  3 +-
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  1 +
 .../saga/alpha/server/TxEventEnvelope.java         | 13 ++-
 .../alpha/server/TxEventEnvelopeRepository.java    |  2 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 95 +++++++++++++++-------
 9 files changed, 162 insertions(+), 60 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 7302016..5ebfb72 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
-  void compensate(String globalTxId, byte[] message);
+  void compensate(String globalTxId, String localTxId, String compensationMethod, byte[] message);
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
index 22605f8..15f5099 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -49,6 +49,6 @@ public class TxConsistentService {
   // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
   private void compensate(TxEvent event) {
     List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name());
-    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
+    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.localTxId(), event.compensationMethod(), evt.payloads()));
   }
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
index 2d0a19b..da46db8 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java
@@ -25,17 +25,25 @@ public class TxEvent {
   private String localTxId;
   private String parentTxId;
   private String type;
+  private String compensationMethod;
   private byte[] payloads;
 
   private TxEvent() {
   }
 
-  public TxEvent(Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) {
+  public TxEvent(Date creationTime,
+      String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
     this.creationTime = creationTime;
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;
     this.parentTxId = parentTxId;
     this.type = type;
+    this.compensationMethod = compensationMethod;
     this.payloads = payloads;
   }
 
@@ -59,6 +67,10 @@ public class TxEvent {
     return type;
   }
 
+  public String compensationMethod() {
+    return compensationMethod;
+  }
+
   public byte[] payloads() {
     return payloads;
   }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index d7e66c3..13a2674 100644
--- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -23,7 +23,6 @@ import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.contains;
@@ -32,10 +31,10 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -61,8 +60,12 @@ public class TxConsistentServiceTest {
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
 
-  private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
-  private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+  private final String compensationMethod = getClass().getCanonicalName();
+  private final List<CompensationContext> compensationContexts = new ArrayList<>();
+
+  private final OmegaCallback omegaCallback = (globalTxId, localTxId, compensationMethod, payloads) ->
+      compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
+
   private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
   @Test
@@ -79,40 +82,85 @@ public class TxConsistentServiceTest {
     }
 
     assertThat(this.events, contains(events));
-    assertThat(callbackArgs.isEmpty(), is(true));
+    assertThat(compensationContexts.isEmpty(), is(true));
   }
 
   @Test
   public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
-    events.add(eventOf(TxStartedEvent, "service a".getBytes()));
-    events.add(eventOf(TxEndedEvent, new byte[0]));
-    events.add(eventOf(TxStartedEvent, "service b".getBytes()));
-    events.add(eventOf(TxEndedEvent, new byte[0]));
+    String localTxId1 = UUID.randomUUID().toString();
+    events.add(eventOf(TxStartedEvent, "service a".getBytes(), localTxId1));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId1));
+
+    String localTxId2 = UUID.randomUUID().toString();
+    events.add(eventOf(TxStartedEvent, "service b".getBytes(), localTxId2));
+    events.add(eventOf(TxEndedEvent, new byte[0], localTxId2));
 
     TxEvent abortEvent = newEvent(TxAbortedEvent);
 
     consistentService.handle(abortEvent);
 
-    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
-  }
-
-  private List<String> stringOf(List<byte[]> bytes) {
-    return bytes.stream()
-        .map(String::new)
-        .collect(Collectors.toList());
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, localTxId1, compensationMethod, "service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId2, compensationMethod, "service b".getBytes())
+    ));
   }
 
   private TxEvent newEvent(EventType eventType) {
-    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes());
+    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes());
   }
 
-  private TxEvent eventOf(EventType eventType, byte[] payloads) {
+  private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) {
     return new TxEvent(new Date(),
         globalTxId,
-        UUID.randomUUID().toString(),
+        localTxId,
         UUID.randomUUID().toString(),
         eventType.name(),
+        compensationMethod,
         payloads);
   }
+
+  private static class CompensationContext {
+    private final String globalTxId;
+    private final String localTxId;
+    private final String compensationMethod;
+    private final byte[] message;
+
+    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
+      this.globalTxId = globalTxId;
+      this.localTxId = localTxId;
+      this.compensationMethod = compensationMethod;
+      this.message = message;
+    }
+
+    @Override
+    public String toString() {
+      return "CompensationContext{" +
+          "globalTxId='" + globalTxId + '\'' +
+          ", localTxId='" + localTxId + '\'' +
+          ", compensationMethod='" + compensationMethod + '\'' +
+          ", message=" + Arrays.toString(message) +
+          '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompensationContext that = (CompensationContext) o;
+      return Objects.equals(globalTxId, that.globalTxId) &&
+          Objects.equals(localTxId, that.localTxId) &&
+          Objects.equals(compensationMethod, that.compensationMethod) &&
+          Arrays.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    }
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 94b024e..d443fa7 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -33,8 +33,7 @@ class AlphaConfig {
   @Bean
   OmegaCallback omegaCallback() {
     // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
-    return (globalTxId, message) -> {
-    };
+    return (globalTxId, localTxId, compensationMethod, message) -> {};
   }
   
   @Bean
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 9ce7c80..f1f8e40 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -40,6 +40,7 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
         message.localTxId(),
         message.parentTxId(),
         message.type(),
+        message.compensationMethod(),
         message.payloads()
     ));
   }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index b027754..fa282b4 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -42,8 +42,13 @@ class TxEventEnvelope {
     this.event = event;
   }
 
-  public TxEventEnvelope(String globalTxId, String localTxId, String parentTxId, String type, byte[] payloads) {
-    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, payloads);
+  public TxEventEnvelope(String globalTxId,
+      String localTxId,
+      String parentTxId,
+      String type,
+      String compensationMethod,
+      byte[] payloads) {
+    this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads);
   }
 
   public long creationTime() {
@@ -66,6 +71,10 @@ class TxEventEnvelope {
     return event.type();
   }
 
+  String compensationMethod() {
+    return event.compensationMethod();
+  }
+
   byte[] payloads() {
     return event.payloads();
   }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 04ff836..5f929a1 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
 
   @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope("
-      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.payloads"
+      + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads"
       + ") FROM TxEventEnvelope t "
       + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2")
   List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index d932072..f7ec33c 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -21,7 +21,6 @@ import static com.google.common.net.HostAndPort.fromParts;
 import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
-import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,12 +28,11 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -68,12 +66,13 @@ public class AlphaIntegrationTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
+  private final String compensationMethod = getClass().getCanonicalName();
 
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
   @Autowired
-  private Map<String, List<byte[]>> callbackArgs;
+  private List<CompensationContext> compensationContexts;
 
   private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
   private SwiftTxEventEndpoint endpoint;
@@ -103,40 +102,28 @@ public class AlphaIntegrationTest {
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
     assertThat(envelope.type(), is(TxStartedEvent.name()));
+    assertThat(envelope.compensationMethod(), is(compensationMethod));
     assertThat(envelope.payloads(), is(payload.getBytes()));
   }
 
   @Test
-  public void compensateOnFailure() throws Exception {
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
-    eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-
-    endpoint.handle(someEvent(TxAbortedEvent));
-
-    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
-  }
-
-  @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+    // duplicate events with same content but different timestamp
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+
+    String localTxId1 = UUID.randomUUID().toString();
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes()));
     eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0]));
 
     endpoint.handle(someEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
-    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
-  }
-
-  private List<String> stringOf(List<byte[]> bytes) {
-    return bytes.stream()
-        .map(String::new)
-        .collect(Collectors.toList());
+    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
+    assertThat(compensationContexts, containsInAnyOrder(
+        new CompensationContext(globalTxId, this.localTxId, compensationMethod, "service a".getBytes()),
+        new CompensationContext(globalTxId, localTxId1, compensationMethod, "service b".getBytes())
+    ));
   }
 
   private SwiftTxEvent someEvent(EventType type) {
@@ -146,6 +133,7 @@ public class AlphaIntegrationTest {
         this.localTxId,
         this.parentTxId,
         type.name(),
+        compensationMethod,
         payload.getBytes());
   }
 
@@ -159,22 +147,67 @@ public class AlphaIntegrationTest {
         localTxId,
         parentTxId,
         eventType.name(),
+        compensationMethod,
         payloads));
   }
 
   @Configuration
   static class OmegaCallbackConfig {
-    private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+    private final List<CompensationContext> compensationContexts = new ArrayList<>();
 
     @Bean
-    Map<String, List<byte[]>> callbackArgs() {
-      return callbackArgs;
+    List<CompensationContext> compensationContexts() {
+      return compensationContexts;
     }
 
     @Bean
     OmegaCallback omegaCallback() {
-      return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+      return (globalTxId, localTxId, compensationMethod, payloads) ->
+          compensationContexts.add(new CompensationContext(globalTxId, localTxId, compensationMethod, payloads));
     }
   }
 
+  private static class CompensationContext {
+    private final String globalTxId;
+    private final String localTxId;
+    private final String compensationMethod;
+    private final byte[] message;
+
+    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
+      this.globalTxId = globalTxId;
+      this.localTxId = localTxId;
+      this.compensationMethod = compensationMethod;
+      this.message = message;
+    }
+
+    @Override
+    public String toString() {
+      return "CompensationContext{" +
+          "globalTxId='" + globalTxId + '\'' +
+          ", localTxId='" + localTxId + '\'' +
+          ", compensationMethod='" + compensationMethod + '\'' +
+          ", message=" + Arrays.toString(message) +
+          '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompensationContext that = (CompensationContext) o;
+      return Objects.equals(globalTxId, that.globalTxId) &&
+          Objects.equals(localTxId, that.localTxId) &&
+          Objects.equals(compensationMethod, that.compensationMethod) &&
+          Arrays.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.