You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/02 10:52:39 UTC

[incubator-servicecomb-saga] branch SCB-152_compensated_event updated (a32c7f1 -> 606f028)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch SCB-152_compensated_event
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    omit a32c7f1  SCB-152 sent tx compensated event on compensation completed
    omit 8d459e2  SCB-149 distinguished omega callbacks by service name and instance id
    omit 47a1ba6  SCB-149 omega callback signature change
    omit 1fe3e9d  SCB-149 checked matching of compensation method
    omit 6cbcf09  SCB-149 added service name and instance id to contract
    omit 63ccc28  SCB-149 removed unnecessary endpoint interface
     add 0f36948  SCB-104 Removed the get method in the RestTemplateTransport
     add 5328f26  SCB-149 removed unnecessary endpoint interface
     add a682178  SCB-149 added service name and instance id to contract
     add 4e8c376  SCB-149 checked matching of compensation method
     add d801105  SCB-149 omega callback signature change
     add 314a271  SCB-149 distinguished omega callbacks by service name and instance id
     add 49235b5  SCB-149 pushed failed compensations to a scheduled task queue
     new 606f028  SCB-152 sent tx compensated event on compensation completed

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a32c7f1)
            \
             N -- N -- N   refs/heads/SCB-152_compensated_event (606f028)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../saga/alpha/core/PendingTaskRunner.java         | 30 +++++---
 ...egaCallback.java => PushBackOmegaCallback.java} | 38 ++++-------
 .../saga/alpha/core/TxConsistentService.java       |  1 -
 .../saga/alpha/core/PendingTaskRunnerTest.java     | 60 ++++++++++++++++
 .../saga/alpha/core/PushBackOmegaCallbackTest.java | 65 ++++++++++++++++++
 .../saga/alpha/core/RetryOmegaCallbackTest.java    | 79 ----------------------
 .../servicecomb/saga/alpha/core/TxEventMaker.java  | 19 ++++--
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 23 +++++--
 .../resttemplate/RestTemplateTransport.java        |  6 +-
 9 files changed, 190 insertions(+), 131 deletions(-)
 copy saga-core/src/test/java/io/servicecomb/saga/core/dag/GraphBasedSagaExecutionComponentTest.java => alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java (53%)
 rename alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/{RetryOmegaCallback.java => PushBackOmegaCallback.java} (58%)
 create mode 100644 alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java
 create mode 100644 alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java
 delete mode 100644 alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
 copy omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/UniqueIdGenerator.java => alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java (61%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 01/01: SCB-152 sent tx compensated event on compensation completed

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-152_compensated_event
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 606f0280eef2e73eceb6386423cebaa98de5c34f
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Jan 2 15:13:56 2018 +0800

    SCB-152 sent tx compensated event on compensation completed
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../saga/omega/context/OmegaContext.java           |   2 +-
 .../spring/TransactionAspectConfig.java            |   7 ++
 .../spring/TransactionInterceptionTest.java        | 102 ++++++++-------------
 .../saga/omega/transaction/spring/User.java        |   8 ++
 ...ceptor.java => CompensationMessageHandler.java} |  21 ++---
 .../transaction/FailedTransactionInterceptor.java  |  11 +--
 .../saga/omega/transaction/MessageHandler.java     |   2 +-
 .../saga/omega/transaction/TxAbortedEvent.java     |  15 ++-
 .../{TxEndedEvent.java => TxCompensatedEvent.java} |   4 +-
 .../saga/omega/transaction/TxEndedEvent.java       |   4 +-
 .../saga/omega/transaction/TxEvent.java            |  13 +++
 .../saga/omega/transaction/TxStartedEvent.java     |   4 +-
 ...st.java => CompensationMessageHandlerTest.java} |  38 +++++---
 .../PostTransactionInterceptorTest.java            |   1 +
 .../transaction/PreTransactionInterceptorTest.java |   1 +
 15 files changed, 122 insertions(+), 111 deletions(-)

diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
index ccc3738..705e74e 100644
--- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
+++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java
@@ -82,7 +82,7 @@ public class OmegaContext {
     compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
   }
 
-  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) {
+  public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
     CompensationContext compensationContext = compensationContexts.get(compensationMethod);
 
     try {
diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
index 5982109..da9f3f8 100644
--- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
+++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java
@@ -22,6 +22,8 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 
 import io.servicecomb.saga.omega.context.OmegaContext;
+import io.servicecomb.saga.omega.transaction.CompensationMessageHandler;
+import io.servicecomb.saga.omega.transaction.MessageHandler;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.TransactionAspect;
 
@@ -30,6 +32,11 @@ import io.servicecomb.saga.omega.transaction.TransactionAspect;
 public class TransactionAspectConfig {
 
   @Bean
+  MessageHandler messageHandler(MessageSender sender, OmegaContext context) {
+    return new CompensationMessageHandler(sender, context);
+  }
+
+  @Bean
   TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
     return new TransactionAspect(sender, context);
   }
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
index 4ffd546..7efa304 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java
@@ -20,16 +20,14 @@ package io.servicecomb.saga.omega.transaction.spring;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static io.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
-import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 import org.junit.After;
 import org.junit.Before;
@@ -46,7 +44,10 @@ import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.transaction.MessageHandler;
 import io.servicecomb.saga.omega.transaction.MessageSender;
-import io.servicecomb.saga.omega.transaction.TxEvent;
+import io.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import io.servicecomb.saga.omega.transaction.TxCompensatedEvent;
+import io.servicecomb.saga.omega.transaction.TxEndedEvent;
+import io.servicecomb.saga.omega.transaction.TxStartedEvent;
 import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;
 
 @RunWith(SpringRunner.class)
@@ -62,7 +63,7 @@ public class TransactionInterceptionTest {
   private final String email = uniquify("email");
 
   @Autowired
-  private List<byte[]> messages;
+  private List<String> messages;
 
   @Autowired
   private TransactionalUserService userService;
@@ -94,11 +95,11 @@ public class TransactionInterceptionTest {
 
     String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
 
-    assertEquals(
-        asList(
-            txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, username, email),
-            txEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
-        toString(messages)
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
+        toArray(messages)
     );
 
     User actual = userRepository.findOne(user.id());
@@ -107,19 +108,22 @@ public class TransactionInterceptionTest {
 
   @Test
   public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
+    Throwable throwable = null;
+    User user = new User(ILLEGAL_USER, email);
     try {
-      userService.add(new User(ILLEGAL_USER, email));
+      userService.add(user);
       expectFailing(IllegalArgumentException.class);
     } catch (IllegalArgumentException ignored) {
+      throwable = ignored;
     }
 
     String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
 
-    assertEquals(
-        asList(
-            txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, ILLEGAL_USER, email),
-            txAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
-        toString(messages)
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
+        toArray(messages)
     );
   }
 
@@ -133,22 +137,32 @@ public class TransactionInterceptionTest {
 
     String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();
 
-    messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user);
-    messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser);
+    messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
+    messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);
 
     assertThat(userRepository.findOne(user.id()), is(nullValue()));
     assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));
+
+    assertArrayEquals(
+        new String[]{
+            new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
+            new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
+            new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
+            new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
+            new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
+        },
+        toArray(messages)
+    );
   }
 
-  private List<String> toString(List<byte[]> messages) {
-    return messages.stream()
-        .map(String::new)
-        .collect(Collectors.toList());
+  private String[] toArray(List<String> messages) {
+    return messages.toArray(new String[messages.size()]);
   }
 
   @Configuration
   static class MessageConfig {
-    private final List<byte[]> messages = new ArrayList<>();
+    private final List<String> messages = new ArrayList<>();
 
     @Bean
     OmegaContext omegaContext() {
@@ -156,51 +170,13 @@ public class TransactionInterceptionTest {
     }
 
     @Bean
-    List<byte[]> messages() {
+    List<String> messages() {
       return messages;
     }
 
     @Bean
     MessageSender sender() {
-      return (event) -> messages.add(serialize(event));
-    }
-
-    private byte[] serialize(TxEvent event) {
-      if (TX_STARTED_EVENT.equals(event.type())) {
-        User user = ((User) event.payloads()[0]);
-        return txStartedEvent(event.globalTxId(),
-            event.localTxId(),
-            event.parentTxId(),
-            event.compensationMethod(),
-            user.username(),
-            user.email()).getBytes();
-      }
-      return txEndedEvent(event.globalTxId(),
-          event.localTxId(),
-          event.parentTxId(),
-          event.compensationMethod()).getBytes();
-    }
-
-    @Bean
-    MessageHandler handler(OmegaContext omegaContext) {
-      return omegaContext::compensate;
+      return (event) -> messages.add(event.toString());
     }
   }
-
-  private static String txStartedEvent(String globalTxId,
-      String localTxId,
-      String parentTxId,
-      String compensationMethod,
-      String username,
-      String email) {
-    return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_STARTED_EVENT + ":" + username + ":" + email;
-  }
-
-  private static String txEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
-  }
-
-  private static String txAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
-    return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
-  }
 }
diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java
index 6b2e55f..5af25e4 100644
--- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java
+++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java
@@ -70,4 +70,12 @@ public class User {
   public int hashCode() {
     return Objects.hash(id, username, email);
   }
+
+  @Override
+  public String toString() {
+    return "User{" +
+        "username='" + username + '\'' +
+        ", email='" + email + '\'' +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
similarity index 60%
copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
copy to omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
index 6cbd8af..edc3243 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java
@@ -17,23 +17,20 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import io.servicecomb.saga.omega.context.OmegaContext;
 
-class FailedTransactionInterceptor {
+public class CompensationMessageHandler implements MessageHandler {
   private final MessageSender sender;
+  private final OmegaContext omegaContext;
 
-  FailedTransactionInterceptor(MessageSender sender) {
+  public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) {
     this.sender = sender;
+    this.omegaContext = omegaContext;
   }
 
-  void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)));
-  }
-
-  private String stackTrace(Throwable e) {
-    StringWriter writer = new StringWriter();
-    e.printStackTrace(new PrintWriter(writer));
-    return writer.toString();
+  @Override
+  public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
+    omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads);
+    sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
index 6cbd8af..9c164dd 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java
@@ -17,9 +17,6 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
 class FailedTransactionInterceptor {
   private final MessageSender sender;
 
@@ -28,12 +25,6 @@ class FailedTransactionInterceptor {
   }
 
   void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
-    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)));
-  }
-
-  private String stackTrace(Throwable e) {
-    StringWriter writer = new StringWriter();
-    e.printStackTrace(new PrintWriter(writer));
-    return writer.toString();
+    sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
index caf2da8..d867085 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java
@@ -18,5 +18,5 @@
 package io.servicecomb.saga.omega.transaction;
 
 public interface MessageHandler {
-  void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads);
+  void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads);
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java
index 2d04a7d..0d7e5ba 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java
@@ -17,8 +17,17 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-class TxAbortedEvent extends TxEvent {
-  TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, String stackTrace) {
-    super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace);
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class TxAbortedEvent extends TxEvent {
+  public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
+    super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable));
+  }
+
+  private static String stackTrace(Throwable e) {
+    StringWriter writer = new StringWriter();
+    e.printStackTrace(new PrintWriter(writer));
+    return writer.toString();
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
similarity index 85%
copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java
copy to omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
index 2836948..8d518d6 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java
@@ -17,8 +17,8 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-class TxEndedEvent extends TxEvent {
-  TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+public class TxCompensatedEvent extends TxEvent {
+  public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
     super(globalTxId, localTxId, parentTxId, compensationMethod);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java
index 2836948..6922f29 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java
@@ -17,8 +17,8 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-class TxEndedEvent extends TxEvent {
-  TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
+public class TxEndedEvent extends TxEvent {
+  public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
     super(globalTxId, localTxId, parentTxId, compensationMethod);
   }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
index 1616f69..a11a9ad 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java
@@ -17,6 +17,8 @@
 
 package io.servicecomb.saga.omega.transaction;
 
+import java.util.Arrays;
+
 public class TxEvent {
   private final long timestamp;
   private final String globalTxId;
@@ -61,4 +63,15 @@ public class TxEvent {
   public String compensationMethod() {
     return compensationMethod;
   }
+
+  @Override
+  public String toString() {
+    return "TxEvent{" +
+        "globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", compensationMethod='" + compensationMethod + '\'' +
+        ", payloads=" + Arrays.toString(payloads) +
+        '}';
+  }
 }
diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
index 7ef3089..13534ac 100644
--- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
+++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java
@@ -17,9 +17,9 @@
 
 package io.servicecomb.saga.omega.transaction;
 
-class TxStartedEvent extends TxEvent {
+public class TxStartedEvent extends TxEvent {
 
-  TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object[] payloads) {
+  public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
     super(globalTxId, localTxId, parentTxId, compensationMethod, payloads);
   }
 }
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
similarity index 52%
copy from omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
copy to omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
index b7b2ec0..1070e59 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java
@@ -18,38 +18,46 @@
 package io.servicecomb.saga.omega.transaction;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static java.util.Arrays.asList;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 
 import org.junit.Test;
 
-public class PreTransactionInterceptorTest {
-  private final List<TxEvent> messages = new ArrayList<>();
-  private final String globalTxId = UUID.randomUUID().toString();
-  private final String localTxId = UUID.randomUUID().toString();
-  private final String parentTxId = UUID.randomUUID().toString();
+import io.servicecomb.saga.omega.context.OmegaContext;
 
-  private final MessageSender sender = messages::add;
+public class CompensationMessageHandlerTest {
 
-  private final String message = uniquify("message");
-  private final PreTransactionInterceptor interceptor = new PreTransactionInterceptor(sender);
+  private final List<TxEvent> events = new ArrayList<>();
+  private final MessageSender sender = events::add;
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String compensationMethod = getClass().getCanonicalName();
+  private final String payload = uniquify("blah");
+
+  private final OmegaContext omegaContext = mock(OmegaContext.class);
+  private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext);
 
   @Test
-  public void sendsTxStartedEvent() throws Exception {
-    interceptor.intercept(globalTxId, localTxId, parentTxId, getClass().getCanonicalName(), message);
+  public void sendsEventOnCompensationCompleted() throws Exception {
+    handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload);
 
-    TxEvent event = messages.get(0);
+    assertThat(events.size(), is(1));
 
+    TxEvent event = events.get(0);
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxCompensatedEvent"));
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
-    assertThat(asList(event.payloads()), contains(message));
+    assertThat(event.payloads().length, is(0));
+
+    verify(omegaContext).compensate(globalTxId, localTxId, compensationMethod, payload);
   }
 }
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
index 86a671f..b2e18ab 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java
@@ -46,6 +46,7 @@ public class PostTransactionInterceptorTest {
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxEndedEvent"));
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(event.payloads().length, is(0));
   }
diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
index b7b2ec0..5d5d832 100644
--- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
+++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java
@@ -49,6 +49,7 @@ public class PreTransactionInterceptorTest {
     assertThat(event.globalTxId(), is(globalTxId));
     assertThat(event.localTxId(), is(localTxId));
     assertThat(event.parentTxId(), is(parentTxId));
+    assertThat(event.type(), is("TxStartedEvent"));
     assertThat(event.compensationMethod(), is(getClass().getCanonicalName()));
     assertThat(asList(event.payloads()), contains(message));
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.