You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2017/12/27 08:41:51 UTC

[incubator-servicecomb-saga] branch SCB-99_compensation_logic created (now 3257c07)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-99_compensation_logic
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


      at 3257c07  SCB-99 added compensation logic on tx aborted event

This branch includes the following new commits:

     new 3257c07  SCB-99 added compensation logic on tx aborted event

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 01/01: SCB-99 added compensation logic on tx aborted event

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-99_compensation_logic
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 3257c07913d9a2c16db8b94ac4816714f141a2ef
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Dec 27 16:37:46 2017 +0800

    SCB-99 added compensation logic on tx aborted event
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-core/pom.xml                           |  18 ++++
 .../{TxEventRepository.java => EventType.java}     |   9 +-
 .../{TxEventRepository.java => OmegaCallback.java} |   4 +-
 .../saga/alpha/core/TxConsistentService.java       |  55 ++++++++++
 .../saga/alpha/core/TxEventRepository.java         |   4 +
 .../saga/alpha/core/TxConsistentServiceTest.java   | 118 +++++++++++++++++++++
 alpha/alpha-server/pom.xml                         |   4 +
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  22 +++-
 .../saga/alpha/server/SpringTxEventRepository.java |  11 ++
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  10 +-
 .../saga/alpha/server/TxEventEnvelope.java         |   4 +
 .../alpha/server/TxEventEnvelopeRepository.java    |   4 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 107 ++++++++++++++++---
 13 files changed, 345 insertions(+), 25 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index a2cb3e2..92f22f6 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -27,6 +27,24 @@
   <modelVersion>4.0.0</modelVersion>
 
   <artifactId>alpha-core</artifactId>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.seanyinx</groupId>
+      <artifactId>unit-scaffolding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
+  </dependencies>
 
 
 </project>
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
similarity index 86%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
index 08ce832..dee34a6 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
@@ -17,6 +17,11 @@
 
 package io.servicecomb.saga.alpha.core;
 
-public interface TxEventRepository {
-  void save(TxEvent event);
+public enum EventType {
+  SagaStartedEvent,
+  TxStartedEvent,
+  TxEndedEvent,
+  TxAbortedEvent,
+  TxCompensatedEvent,
+  SagaEndedEvent
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
similarity index 90%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 08ce832..7302016 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -17,6 +17,6 @@
 
 package io.servicecomb.saga.alpha.core;
 
-public interface TxEventRepository {
-  void save(TxEvent event);
+public interface OmegaCallback {
+  void compensate(String globalTxId, byte[] message);
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
new file mode 100644
index 0000000..af02d74
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+public class TxConsistentService {
+  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
+
+  private final TxEventRepository eventRepository;
+  private final OmegaCallback omegaCallback;
+  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
+  }};
+
+  public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+    this.eventRepository = eventRepository;
+    this.omegaCallback = omegaCallback;
+  }
+
+  public void handle(TxEvent event) {
+    eventRepository.save(event);
+    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+  }
+
+  // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
+  private void compensate(TxEvent event) {
+    List<TxEvent> events = eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
+  }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index 08ce832..cb44f77 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -17,6 +17,10 @@
 
 package io.servicecomb.saga.alpha.core;
 
+import java.util.List;
+
 public interface TxEventRepository {
   void save(TxEvent event);
+
+  List<TxEvent> findCompletedEvents(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
new file mode 100644
index 0000000..64ed62c
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+public class TxConsistentServiceTest {
+  private final List<TxEvent> events = new ArrayList<>();
+  private final TxEventRepository eventRepository = new TxEventRepository() {
+    @Override
+    public void save(TxEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
+          .collect(Collectors.toList());
+    }
+  };
+
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+  private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+
+  @Test
+  public void persistEventOnArrival() throws Exception {
+    TxEvent[] events = {
+        newEvent(SagaStartedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
+        newEvent(TxCompensatedEvent),
+        newEvent(SagaEndedEvent)};
+
+    for (TxEvent event : events) {
+      consistentService.handle(event);
+    }
+
+    assertThat(this.events, contains(events));
+    assertThat(callbackArgs.isEmpty(), is(true));
+  }
+
+  @Test
+  public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+    events.add(eventOf(TxStartedEvent, "service a".getBytes()));
+    events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes()));
+    events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
+
+    consistentService.handle(abortEvent);
+
+    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+  }
+
+  private List<String> stringOf(List<byte[]> bytes) {
+    return bytes.stream()
+        .map(String::new)
+        .collect(Collectors.toList());
+  }
+
+  private TxEvent newEvent(EventType eventType) {
+    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes());
+  }
+
+  private TxEvent eventOf(EventType eventType, byte[] payloads) {
+    return new TxEvent(new Date(),
+        globalTxId,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        payloads);
+  }
+}
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 4bc347e..774883d 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -80,6 +80,10 @@
       <artifactId>h2</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 50d4a8d..94b024e 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,16 +23,34 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxConsistentService;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
 class AlphaConfig {
 
   @Bean
-  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port, TxEventEnvelopeRepository eventRepo) {
+  OmegaCallback omegaCallback() {
+    // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
+    return (globalTxId, message) -> {
+    };
+  }
+  
+  @Bean
+  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port,
+      TxEventEnvelopeRepository eventRepo,
+      OmegaCallback omegaCallback) {
+
     TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo);
 
-    ThriftStartable startable = new ThriftStartable(port, new SwiftTxEventEndpointImpl(eventRepository));
+    ThriftStartable startable = new ThriftStartable(
+        port,
+        new SwiftTxEventEndpointImpl(
+            new TxConsistentService(
+                eventRepository,
+                omegaCallback)));
+
     CompletableFuture.runAsync(startable::start);
 
     return eventRepository;
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7925924..7b59d29 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,6 +17,9 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import io.servicecomb.saga.alpha.core.TxEvent;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
@@ -31,4 +34,12 @@ class SpringTxEventRepository implements TxEventRepository {
   public void save(TxEvent event) {
     eventRepo.save(new TxEventEnvelope(event));
   }
+
+  @Override
+  public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
+        .stream()
+        .map(TxEventEnvelope::event)
+        .collect(Collectors.toList());
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 78b93b4..9ce7c80 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -19,22 +19,22 @@ package io.servicecomb.saga.alpha.server;
 
 import java.util.Date;
 
+import io.servicecomb.saga.alpha.core.TxConsistentService;
 import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.core.TxEventRepository;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
 
-  private final TxEventRepository eventRepository;
+  private final TxConsistentService txConsistentService;
 
-  SwiftTxEventEndpointImpl(TxEventRepository eventRepository) {
-    this.eventRepository = eventRepository;
+  SwiftTxEventEndpointImpl(TxConsistentService txConsistentService) {
+    this.txConsistentService = txConsistentService;
   }
 
   @Override
   public void handle(SwiftTxEvent message) {
-    eventRepository.save(new TxEvent(
+    txConsistentService.handle(new TxEvent(
         new Date(message.timestamp()),
         message.globalTxId(),
         message.localTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index adbb28e..152edfb 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -63,4 +63,8 @@ class TxEventEnvelope {
   byte[] payloads() {
     return event.payloads();
   }
+
+  TxEvent event() {
+    return event;
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3e46de3..cd3cbc7 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -17,8 +17,12 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.List;
+
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+
+  List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 }
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index acce006..e9c9a98 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -18,29 +18,49 @@
 package io.servicecomb.saga.alpha.server;
 
 import static com.google.common.net.HostAndPort.fromParts;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.facebook.nifty.client.FramedClientConnector;
 import com.facebook.swift.service.ThriftClientManager;
 
+import io.servicecomb.saga.alpha.core.EventType;
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxEvent;
+import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = AlphaApplication.class, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
 public class AlphaIntegrationTest {
   private static final ThriftClientManager clientManager = new ThriftClientManager();
-  private static final String TX_STARTED_EVENT = "TxStartedEvent";
   private static final String payload = "hello world";
 
   private final int port = 8090;
@@ -52,32 +72,91 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
+  @Autowired
+  private Map<String, List<byte[]>> callbackArgs;
+
+  private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
+  private SwiftTxEventEndpoint endpoint;
+
   @AfterClass
   public static void tearDown() throws Exception {
     clientManager.close();
   }
 
+  @Before
+  public void setUp() throws Exception {
+    endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
+  }
+
+  @After
+  public void after() throws Exception {
+    endpoint.close();
+  }
+
   @Test
   public void persistsEvent() throws Exception {
-    FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
-    SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
-
-    endpoint.handle(new SwiftTxEvent(
-        System.currentTimeMillis(),
-        globalTxId,
-        localTxId,
-        parentTxId,
-        TX_STARTED_EVENT,
-        payload.getBytes()));
+    endpoint.handle(someEvent(TxStartedEvent));
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
-    assertThat(envelope.type(), is(TX_STARTED_EVENT));
+    assertThat(envelope.type(), is(TxStartedEvent.name()));
     assertThat(envelope.payloads(), is(payload.getBytes()));
+  }
 
-    endpoint.close();
+  @Test
+  public void compensateOnFailure() throws Exception {
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+
+    endpoint.handle(someEvent(TxAbortedEvent));
+
+    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+  }
+
+  private List<String> stringOf(List<byte[]> bytes) {
+    return bytes.stream()
+        .map(String::new)
+        .collect(Collectors.toList());
+  }
+
+  private SwiftTxEvent someEvent(EventType type) {
+    return new SwiftTxEvent(
+        System.currentTimeMillis(),
+        this.globalTxId,
+        this.localTxId,
+        this.parentTxId,
+        type.name(),
+        payload.getBytes());
+  }
+
+  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) {
+    return new TxEventEnvelope(new TxEvent(new Date(),
+        globalTxId,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        payloads));
   }
+
+  @Configuration
+  static class OmegaCallbackConfig {
+    private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+
+    @Bean
+    Map<String, List<byte[]>> callbackArgs() {
+      return callbackArgs;
+    }
+
+    @Bean
+    OmegaCallback omegaCallback() {
+      return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+    }
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.