You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2017/12/27 08:41:52 UTC

[incubator-servicecomb-saga] 01/01: SCB-99 added compensation logic on tx aborted event

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-99_compensation_logic
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 3257c07913d9a2c16db8b94ac4816714f141a2ef
Author: seanyinx <se...@huawei.com>
AuthorDate: Wed Dec 27 16:37:46 2017 +0800

    SCB-99 added compensation logic on tx aborted event
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 alpha/alpha-core/pom.xml                           |  18 ++++
 .../{TxEventRepository.java => EventType.java}     |   9 +-
 .../{TxEventRepository.java => OmegaCallback.java} |   4 +-
 .../saga/alpha/core/TxConsistentService.java       |  55 ++++++++++
 .../saga/alpha/core/TxEventRepository.java         |   4 +
 .../saga/alpha/core/TxConsistentServiceTest.java   | 118 +++++++++++++++++++++
 alpha/alpha-server/pom.xml                         |   4 +
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  22 +++-
 .../saga/alpha/server/SpringTxEventRepository.java |  11 ++
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  10 +-
 .../saga/alpha/server/TxEventEnvelope.java         |   4 +
 .../alpha/server/TxEventEnvelopeRepository.java    |   4 +
 .../saga/alpha/server/AlphaIntegrationTest.java    | 107 ++++++++++++++++---
 13 files changed, 345 insertions(+), 25 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index a2cb3e2..92f22f6 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -27,6 +27,24 @@
   <modelVersion>4.0.0</modelVersion>
 
   <artifactId>alpha-core</artifactId>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.seanyinx</groupId>
+      <artifactId>unit-scaffolding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
+  </dependencies>
 
 
 </project>
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
similarity index 86%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
index 08ce832..dee34a6 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/EventType.java
@@ -17,6 +17,11 @@
 
 package io.servicecomb.saga.alpha.core;
 
-public interface TxEventRepository {
-  void save(TxEvent event);
+public enum EventType {
+  SagaStartedEvent,
+  TxStartedEvent,
+  TxEndedEvent,
+  TxAbortedEvent,
+  TxCompensatedEvent,
+  SagaEndedEvent
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
similarity index 90%
copy from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
copy to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
index 08ce832..7302016 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -17,6 +17,6 @@
 
 package io.servicecomb.saga.alpha.core;
 
-public interface TxEventRepository {
-  void save(TxEvent event);
+public interface OmegaCallback {
+  void compensate(String globalTxId, byte[] message);
 }
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
new file mode 100644
index 0000000..af02d74
--- /dev/null
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+public class TxConsistentService {
+  private static final Consumer<TxEvent> DO_NOTHING_CONSUMER = event -> {};
+
+  private final TxEventRepository eventRepository;
+  private final OmegaCallback omegaCallback;
+  private final Map<String, Consumer<TxEvent>> eventCallbacks = new HashMap<String, Consumer<TxEvent>>() {{
+    put(TxStartedEvent.name(), DO_NOTHING_CONSUMER);
+    put(TxAbortedEvent.name(), (event) -> compensate(event));
+  }};
+
+  public TxConsistentService(TxEventRepository eventRepository, OmegaCallback omegaCallback) {
+    this.eventRepository = eventRepository;
+    this.omegaCallback = omegaCallback;
+  }
+
+  public void handle(TxEvent event) {
+    eventRepository.save(event);
+    CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event));
+  }
+
+  // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all
+  private void compensate(TxEvent event) {
+    List<TxEvent> events = eventRepository.findCompletedEvents(event.globalTxId(), TxEndedEvent.name());
+    events.forEach(evt -> omegaCallback.compensate(evt.globalTxId(), evt.payloads()));
+  }
+}
diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
index 08ce832..cb44f77 100644
--- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
+++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEventRepository.java
@@ -17,6 +17,10 @@
 
 package io.servicecomb.saga.alpha.core;
 
+import java.util.List;
+
 public interface TxEventRepository {
   void save(TxEvent event);
+
+  List<TxEvent> findCompletedEvents(String globalTxId, String type);
 }
diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
new file mode 100644
index 0000000..64ed62c
--- /dev/null
+++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxCompensatedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+public class TxConsistentServiceTest {
+  private final List<TxEvent> events = new ArrayList<>();
+  private final TxEventRepository eventRepository = new TxEventRepository() {
+    @Override
+    public void save(TxEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+      return events.stream()
+          .filter(event -> globalTxId.equals(event.globalTxId()) && type.equals(event.type()))
+          .collect(Collectors.toList());
+    }
+  };
+
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+
+  private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+  private final OmegaCallback omegaCallback = (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+  private final TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
+
+  @Test
+  public void persistEventOnArrival() throws Exception {
+    TxEvent[] events = {
+        newEvent(SagaStartedEvent),
+        newEvent(TxStartedEvent),
+        newEvent(TxEndedEvent),
+        newEvent(TxCompensatedEvent),
+        newEvent(SagaEndedEvent)};
+
+    for (TxEvent event : events) {
+      consistentService.handle(event);
+    }
+
+    assertThat(this.events, contains(events));
+    assertThat(callbackArgs.isEmpty(), is(true));
+  }
+
+  @Test
+  public void compensateGlobalTx_OnAnyLocalTxFailure() throws Exception {
+    events.add(eventOf(TxStartedEvent, "service a".getBytes()));
+    events.add(eventOf(TxEndedEvent, "service a".getBytes()));
+    events.add(eventOf(TxStartedEvent, "service b".getBytes()));
+    events.add(eventOf(TxEndedEvent, "service b".getBytes()));
+
+    TxEvent abortEvent = newEvent(TxAbortedEvent);
+
+    consistentService.handle(abortEvent);
+
+    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+  }
+
+  private List<String> stringOf(List<byte[]> bytes) {
+    return bytes.stream()
+        .map(String::new)
+        .collect(Collectors.toList());
+  }
+
+  private TxEvent newEvent(EventType eventType) {
+    return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), "yeah".getBytes());
+  }
+
+  private TxEvent eventOf(EventType eventType, byte[] payloads) {
+    return new TxEvent(new Date(),
+        globalTxId,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        payloads);
+  }
+}
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index 4bc347e..774883d 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -80,6 +80,10 @@
       <artifactId>h2</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
 
   </dependencies>
 
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 50d4a8d..94b024e 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,16 +23,34 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxConsistentService;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
 class AlphaConfig {
 
   @Bean
-  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port, TxEventEnvelopeRepository eventRepo) {
+  OmegaCallback omegaCallback() {
+    // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138
+    return (globalTxId, message) -> {
+    };
+  }
+  
+  @Bean
+  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port,
+      TxEventEnvelopeRepository eventRepo,
+      OmegaCallback omegaCallback) {
+
     TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo);
 
-    ThriftStartable startable = new ThriftStartable(port, new SwiftTxEventEndpointImpl(eventRepository));
+    ThriftStartable startable = new ThriftStartable(
+        port,
+        new SwiftTxEventEndpointImpl(
+            new TxConsistentService(
+                eventRepository,
+                omegaCallback)));
+
     CompletableFuture.runAsync(startable::start);
 
     return eventRepository;
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
index 7925924..7b59d29 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SpringTxEventRepository.java
@@ -17,6 +17,9 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import io.servicecomb.saga.alpha.core.TxEvent;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
@@ -31,4 +34,12 @@ class SpringTxEventRepository implements TxEventRepository {
   public void save(TxEvent event) {
     eventRepo.save(new TxEventEnvelope(event));
   }
+
+  @Override
+  public List<TxEvent> findCompletedEvents(String globalTxId, String type) {
+    return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type)
+        .stream()
+        .map(TxEventEnvelope::event)
+        .collect(Collectors.toList());
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 78b93b4..9ce7c80 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -19,22 +19,22 @@ package io.servicecomb.saga.alpha.server;
 
 import java.util.Date;
 
+import io.servicecomb.saga.alpha.core.TxConsistentService;
 import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.core.TxEventRepository;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
 
-  private final TxEventRepository eventRepository;
+  private final TxConsistentService txConsistentService;
 
-  SwiftTxEventEndpointImpl(TxEventRepository eventRepository) {
-    this.eventRepository = eventRepository;
+  SwiftTxEventEndpointImpl(TxConsistentService txConsistentService) {
+    this.txConsistentService = txConsistentService;
   }
 
   @Override
   public void handle(SwiftTxEvent message) {
-    eventRepository.save(new TxEvent(
+    txConsistentService.handle(new TxEvent(
         new Date(message.timestamp()),
         message.globalTxId(),
         message.localTxId(),
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
index adbb28e..152edfb 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java
@@ -63,4 +63,8 @@ class TxEventEnvelope {
   byte[] payloads() {
     return event.payloads();
   }
+
+  TxEvent event() {
+    return event;
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
index 3e46de3..cd3cbc7 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java
@@ -17,8 +17,12 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.List;
+
 import org.springframework.data.repository.CrudRepository;
 
 interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long> {
   TxEventEnvelope findByEventGlobalTxId(String globalTxId);
+
+  List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type);
 }
diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index acce006..e9c9a98 100644
--- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -18,29 +18,49 @@
 package io.servicecomb.saga.alpha.server;
 
 import static com.google.common.net.HostAndPort.fromParts;
+import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.facebook.nifty.client.FramedClientConnector;
 import com.facebook.swift.service.ThriftClientManager;
 
+import io.servicecomb.saga.alpha.core.EventType;
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxEvent;
+import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = AlphaApplication.class, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
 public class AlphaIntegrationTest {
   private static final ThriftClientManager clientManager = new ThriftClientManager();
-  private static final String TX_STARTED_EVENT = "TxStartedEvent";
   private static final String payload = "hello world";
 
   private final int port = 8090;
@@ -52,32 +72,91 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
+  @Autowired
+  private Map<String, List<byte[]>> callbackArgs;
+
+  private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
+  private SwiftTxEventEndpoint endpoint;
+
   @AfterClass
   public static void tearDown() throws Exception {
     clientManager.close();
   }
 
+  @Before
+  public void setUp() throws Exception {
+    endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
+  }
+
+  @After
+  public void after() throws Exception {
+    endpoint.close();
+  }
+
   @Test
   public void persistsEvent() throws Exception {
-    FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port));
-    SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
-
-    endpoint.handle(new SwiftTxEvent(
-        System.currentTimeMillis(),
-        globalTxId,
-        localTxId,
-        parentTxId,
-        TX_STARTED_EVENT,
-        payload.getBytes()));
+    endpoint.handle(someEvent(TxStartedEvent));
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
-    assertThat(envelope.type(), is(TX_STARTED_EVENT));
+    assertThat(envelope.type(), is(TxStartedEvent.name()));
     assertThat(envelope.payloads(), is(payload.getBytes()));
+  }
 
-    endpoint.close();
+  @Test
+  public void compensateOnFailure() throws Exception {
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service a".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, "service b".getBytes()));
+    eventRepo.save(eventEnvelopeOf(TxEndedEvent, "service b".getBytes()));
+
+    endpoint.handle(someEvent(TxAbortedEvent));
+
+    await().atMost(1, SECONDS).until(() -> callbackArgs.getOrDefault(globalTxId, emptyList()).size() > 1);
+    assertThat(stringOf(callbackArgs.get(globalTxId)), containsInAnyOrder("service a", "service b"));
+  }
+
+  private List<String> stringOf(List<byte[]> bytes) {
+    return bytes.stream()
+        .map(String::new)
+        .collect(Collectors.toList());
+  }
+
+  private SwiftTxEvent someEvent(EventType type) {
+    return new SwiftTxEvent(
+        System.currentTimeMillis(),
+        this.globalTxId,
+        this.localTxId,
+        this.parentTxId,
+        type.name(),
+        payload.getBytes());
+  }
+
+  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) {
+    return new TxEventEnvelope(new TxEvent(new Date(),
+        globalTxId,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        payloads));
   }
+
+  @Configuration
+  static class OmegaCallbackConfig {
+    private final Map<String, List<byte[]>> callbackArgs = new HashMap<>();
+
+    @Bean
+    Map<String, List<byte[]>> callbackArgs() {
+      return callbackArgs;
+    }
+
+    @Bean
+    OmegaCallback omegaCallback() {
+      return (key, value) -> callbackArgs.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+    }
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.