You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/05 08:46:40 UTC

[incubator-servicecomb-saga] 03/03: SCB-167 added disconnection call to properly close compensation stream

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit fdce17bb7890c5332617f641c81b0f5e3509f335
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Jan 5 15:32:30 2018 +0800

    SCB-167 added disconnection call to properly close compensation stream
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../servicecomb/saga/alpha/core/OmegaCallback.java |  3 +
 .../saga/alpha/server/GrpcOmegaCallback.java       | 13 ++--
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 16 +++++
 .../saga/alpha/server/AlphaIntegrationTest.java    | 82 ++++++++++++++++++++--
 integration-tests/pack-tests/pom.xml               |  5 ++
 .../pack/tests/GreetingApplication.java            |  9 +++
 .../integration/pack/tests/GreetingController.java | 17 ++++-
 .../integration/pack/tests/GreetingService.java    | 31 ++++++--
 .../saga/integration/pack/tests/PackIT.java        | 71 ++++++++++++++++++-
 .../integration/pack/tests/TxEventEnvelope.java    | 16 +++++
 .../connector/grpc/GrpcClientMessageSender.java    | 14 +++-
 .../grpc/GrpcCompensateStreamObserver.java         |  7 +-
 .../saga/omega/spring/OmegaSpringConfig.java       |  5 +-
 .../saga/omega/transaction/MessageSender.java      |  3 +
 .../src/main/proto/GrpcTxEvent.proto               |  1 +
 15 files changed, 267 insertions(+), 26 deletions(-)

diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
index bc51238..f60a44d 100644
--- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
+++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/OmegaCallback.java
@@ -19,4 +19,7 @@ package org.apache.servicecomb.saga.alpha.core;
 
 public interface OmegaCallback {
   void compensate(TxEvent event);
+
+  default void disconnect() {
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index c576552..5a95281 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -28,18 +28,14 @@ import com.google.protobuf.ByteString;
 
 import io.grpc.stub.StreamObserver;
 
-public class GrpcOmegaCallback implements OmegaCallback {
+class GrpcOmegaCallback implements OmegaCallback {
 
   private final StreamObserver<GrpcCompensateCommand> observer;
 
-  public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+  GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
     this.observer = observer;
   }
 
-  StreamObserver<GrpcCompensateCommand> observer() {
-    return observer;
-  }
-
   @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
@@ -51,4 +47,9 @@ public class GrpcOmegaCallback implements OmegaCallback {
         .build();
     observer.onNext(command);
   }
+
+  @Override
+  public void disconnect() {
+    observer.onCompleted();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 1e34f21..fc5f608 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,6 +20,8 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import static java.util.Collections.emptyMap;
+
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -55,6 +57,20 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         .computeIfAbsent(request.getInstanceId(), key -> new GrpcOmegaCallback(responseObserver));
   }
 
+  // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected
+  @Override
+  public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap())
+        .remove(request.getInstanceId());
+
+    if (callback != null) {
+      callback.disconnect();
+    }
+
+    responseObserver.onNext(GrpcAck.newBuilder().build());
+    responseObserver.onCompleted();
+  }
+
   @Override
   public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
     txConsistentService.handle(new TxEvent(
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index a266ed2..2f3a3ec 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -24,14 +24,17 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
@@ -39,6 +42,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -81,8 +85,11 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
+  @Autowired
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
   private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
-  private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
+  private final CompensateStreamObserver compensateResponseObserver = new CompensateStreamObserver();
 
   @AfterClass
   public static void tearDown() throws Exception {
@@ -95,6 +102,11 @@ public class AlphaIntegrationTest {
     receivedCommands.clear();
   }
 
+  @After
+  public void after() throws Exception {
+    blockingStub.onDisconnected(serviceConfig);
+  }
+
   @Test
   public void persistsEvent() {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -117,6 +129,46 @@ public class AlphaIntegrationTest {
   }
 
   @Test
+  public void closeStreamOnDisconnected() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+
+    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+
+    assertThat(
+        omegaCallbacks.get(serviceConfig.getServiceName()).get(serviceConfig.getInstanceId()),
+        is(notNullValue()));
+
+    blockingStub.onDisconnected(serviceConfig);
+    assertThat(
+        omegaCallbacks.get(serviceConfig.getServiceName()).containsKey(serviceConfig.getInstanceId()),
+        is(false));
+
+    assertThat(compensateResponseObserver.isCompleted(), is(true));
+  }
+
+  @Test
+  public void closeStreamOfDisconnectedClientOnly() {
+    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
+    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(serviceConfig.getServiceName()));
+
+    GrpcServiceConfig anotherServiceConfig = someServiceConfig();
+    CompensateStreamObserver anotherResponseObserver = new CompensateStreamObserver();
+    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, anotherResponseObserver);
+
+    await().atMost(1, SECONDS).until(() -> omegaCallbacks.containsKey(anotherServiceConfig.getServiceName()));
+
+    blockingStub.onDisconnected(serviceConfig);
+
+    assertThat(
+        omegaCallbacks.get(anotherServiceConfig.getServiceName()).containsKey(anotherServiceConfig.getInstanceId()),
+        is(true));
+
+    assertThat(anotherResponseObserver.isCompleted(), is(false));
+
+    TxEventServiceGrpc.newBlockingStub(clientChannel).onDisconnected(anotherServiceConfig);
+  }
+
+  @Test
   public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
@@ -161,8 +213,12 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent));
 
-    asyncStub.onConnected(serviceConfig, compensateResponseObserver);
-    blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
+    // simulates connection from another service with different globalTxId
+    GrpcServiceConfig anotherServiceConfig = someServiceConfig();
+    TxEventServiceGrpc.newStub(clientChannel).onConnected(anotherServiceConfig, new CompensateStreamObserver());
+
+    TxEventServiceBlockingStub anotherBlockingStub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+    anotherBlockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, UUID.randomUUID().toString()));
 
     await().atMost(1, SECONDS).until(() -> eventRepo.count() == 2);
 
@@ -171,10 +227,15 @@ public class AlphaIntegrationTest {
 
     assertThat(receivedCommands.size(), is(1));
     assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
-    assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
-    assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
-    assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
-    assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
+
+    anotherBlockingStub.onDisconnected(anotherServiceConfig);
+  }
+
+  private GrpcServiceConfig someServiceConfig() {
+    return GrpcServiceConfig.newBuilder()
+        .setServiceName(uniquify("serviceName"))
+        .setInstanceId(uniquify("instanceId"))
+        .build();
   }
 
   private GrpcTxEvent someGrpcEvent(EventType type) {
@@ -214,6 +275,8 @@ public class AlphaIntegrationTest {
   }
 
   private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    private boolean completed = false;
+
     @Override
     public void onNext(GrpcCompensateCommand command) {
       // intercept received command
@@ -226,6 +289,11 @@ public class AlphaIntegrationTest {
 
     @Override
     public void onCompleted() {
+      completed = true;
+    }
+
+    boolean isCompleted() {
+      return completed;
     }
   }
 }
diff --git a/integration-tests/pack-tests/pom.xml b/integration-tests/pack-tests/pom.xml
index 944f45d..fb81a0e 100644
--- a/integration-tests/pack-tests/pom.xml
+++ b/integration-tests/pack-tests/pom.xml
@@ -94,6 +94,11 @@
       <artifactId>groovy-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
index a9461dd..9d1b23a 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingApplication.java
@@ -17,9 +17,13 @@
 
 package org.apache.servicecomb.saga.integration.pack.tests;
 
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.servicecomb.saga.omega.spring.EnableOmega;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
 
 @EnableOmega
 @SpringBootApplication
@@ -27,4 +31,9 @@ public class GreetingApplication {
   public static void main(String[] args) {
     SpringApplication.run(GreetingApplication.class, args);
   }
+
+  @Bean
+  Queue<String> compensated() {
+    return new ConcurrentLinkedQueue<>();
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
index 90fd4ac..12356ee 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingController.java
@@ -28,6 +28,7 @@ import org.springframework.web.client.RestTemplate;
 @Controller
 @RequestMapping("/")
 public class GreetingController {
+  static final String TRESPASSER = "trespasser";
   private final GreetingService greetingService;
   private final RestTemplate restTemplate;
 
@@ -41,13 +42,25 @@ public class GreetingController {
   @GetMapping("/greet")
   ResponseEntity<String> greet(@RequestParam String name) {
     String greetings = greetingService.greet(name);
-    String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
 
-    return ResponseEntity.ok(greetings + "; " + bonjour);
+    if (!TRESPASSER.equals(name)) {
+      String bonjour = restTemplate.getForObject("http://localhost:8080/bonjour?name={name}", String.class, name);
+
+      return ResponseEntity.ok(greetings + "; " + bonjour);
+    }
+
+    String rude = restTemplate.getForObject("http://localhost:8080/rude?name={name}", String.class, name);
+
+    return ResponseEntity.ok(greetings + "; " + rude);
   }
 
   @GetMapping("/bonjour")
   ResponseEntity<String> bonjour(@RequestParam String name) {
     return ResponseEntity.ok(greetingService.bonjour(name));
   }
+
+  @GetMapping("/rude")
+  ResponseEntity<String> rude(@RequestParam String name) {
+    return ResponseEntity.ok(greetingService.beingRude(name));
+  }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
index ef8519f..69a86f6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/GreetingService.java
@@ -17,19 +17,28 @@
 
 package org.apache.servicecomb.saga.integration.pack.tests;
 
-import org.springframework.stereotype.Service;
+import java.util.Queue;
 
 import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 @Service
-public class GreetingService {
+class GreetingService {
+  private final Queue<String> compensated;
+
+  @Autowired
+  GreetingService(Queue<String> compensated) {
+    this.compensated = compensated;
+  }
+
   @Compensable(compensationMethod = "goodbye")
   String greet(String name) {
     return "Greetings, " + name;
   }
 
   String goodbye(String name) {
-    return "Goodbye, " + name;
+    return appendMessage("Goodbye, " + name);
   }
 
   @Compensable(compensationMethod = "auRevoir")
@@ -38,6 +47,20 @@ public class GreetingService {
   }
 
   String auRevoir(String name) {
-    return "Au revoir, " + name;
+    return appendMessage("Au revoir, " + name);
+  }
+
+  @Compensable(compensationMethod = "apologize")
+  String beingRude(String name) {
+    throw new IllegalStateException("You know where the door is, " + name);
+  }
+
+  String apologize(String name) {
+    return appendMessage("My bad, please take the window instead, " + name);
+  }
+
+  private String appendMessage(String message) {
+    compensated.add(message);
+    return message;
   }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
index 50bbd4e..66454b7 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/PackIT.java
@@ -17,17 +17,24 @@
 
 package org.apache.servicecomb.saga.integration.pack.tests;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.integration.pack.tests.GreetingController.TRESPASSER;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.springframework.http.HttpMethod.GET;
+import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
 import static org.springframework.http.HttpStatus.OK;
 
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,6 +62,14 @@ public class PackIT {
   @Autowired
   private TxEventEnvelopeRepository repository;
 
+  @Autowired
+  private Queue<String> compensatedMessages;
+
+  @After
+  public void tearDown() throws Exception {
+    repository.deleteAll();
+  }
+
   @Test
   public void updatesTxStateToAlpha() throws Exception {
     HttpHeaders headers = new HttpHeaders();
@@ -74,24 +89,76 @@ public class PackIT {
 
     assertThat(envelopes.size(), is(4));
     assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
+    assertThat(envelopes.get(0).localTxId(), is(notNullValue()));
     assertThat(envelopes.get(0).parentTxId(), is(nullValue()));
     assertThat(envelopes.get(0).serviceName(), is(serviceName));
     assertThat(envelopes.get(0).instanceId(), is(notNullValue()));
 
     assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
+    assertThat(envelopes.get(1).localTxId(), is(envelopes.get(0).localTxId()));
     assertThat(envelopes.get(1).parentTxId(), is(nullValue()));
     assertThat(envelopes.get(1).serviceName(), is(serviceName));
-    assertThat(envelopes.get(1).instanceId(), is(notNullValue()));
+    assertThat(envelopes.get(1).instanceId(), is(envelopes.get(0).instanceId()));
 
 
     assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
+    assertThat(envelopes.get(2).localTxId(), is(notNullValue()));
     assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId()));
     assertThat(envelopes.get(2).serviceName(), is(serviceName));
     assertThat(envelopes.get(2).instanceId(), is(notNullValue()));
 
     assertThat(envelopes.get(3).type(), is("TxEndedEvent"));
+    assertThat(envelopes.get(3).localTxId(), is(envelopes.get(2).localTxId()));
+    assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(3).serviceName(), is(serviceName));
+    assertThat(envelopes.get(3).instanceId(), is(envelopes.get(2).instanceId()));
+
+    assertThat(compensatedMessages.isEmpty(), is(true));
+  }
+
+  @Test
+  public void compensatesFailedGlobalTransaction() throws Exception {
+    HttpHeaders headers = new HttpHeaders();
+
+    headers.set(OmegaContext.GLOBAL_TX_ID_KEY, globalTxId);
+
+    ResponseEntity<String> entity = restTemplate.exchange("/greet?name={name}",
+        GET,
+        new HttpEntity<>(headers),
+        String.class,
+        TRESPASSER);
+
+    assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));
+
+    await().atMost(2, SECONDS).until(() -> repository.count() == 6);
+
+    List<TxEventEnvelope> envelopes = repository.findByGlobalTxIdOrderByCreationTime(globalTxId);
+    assertThat(envelopes.size(), is(6));
+
+    assertThat(envelopes.get(0).type(), is("TxStartedEvent"));
+    assertThat(envelopes.get(1).type(), is("TxEndedEvent"));
+    assertThat(envelopes.get(2).type(), is("TxStartedEvent"));
+
+    assertThat(envelopes.get(3).type(), is("TxAbortedEvent"));
+    assertThat(envelopes.get(3).localTxId(), is(envelopes.get(2).localTxId()));
     assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId()));
     assertThat(envelopes.get(3).serviceName(), is(serviceName));
-    assertThat(envelopes.get(3).instanceId(), is(notNullValue()));
+    assertThat(envelopes.get(3).instanceId(), is(envelopes.get(2).instanceId()));
+
+    assertThat(envelopes.get(4).type(), is("TxCompensatedEvent"));
+    assertThat(envelopes.get(4).localTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(4).parentTxId(), is(nullValue()));
+    assertThat(envelopes.get(4).serviceName(), is(serviceName));
+    assertThat(envelopes.get(4).instanceId(), is(envelopes.get(0).instanceId()));
+
+    assertThat(envelopes.get(5).type(), is("TxCompensatedEvent"));
+    assertThat(envelopes.get(5).localTxId(), is(envelopes.get(2).localTxId()));
+    assertThat(envelopes.get(5).parentTxId(), is(envelopes.get(0).localTxId()));
+    assertThat(envelopes.get(5).serviceName(), is(serviceName));
+    assertThat(envelopes.get(5).instanceId(), is(envelopes.get(2).instanceId()));
+
+    assertThat(compensatedMessages, contains(
+        "Goodbye, " + TRESPASSER,
+        "My bad, please take the window instead, " + TRESPASSER));
   }
 }
diff --git a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
index 25ef1e3..d99c2b6 100644
--- a/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
+++ b/integration-tests/pack-tests/src/test/java/org/apache/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.integration.pack.tests;
 
+import java.util.Arrays;
 import java.util.Date;
 
 import javax.persistence.Entity;
@@ -64,4 +65,19 @@ class TxEventEnvelope {
   public byte[] payloads() {
     return payloads;
   }
+
+  @Override
+  public String toString() {
+    return "TxEventEnvelope{" +
+        "surrogateId=" + surrogateId +
+        ", serviceName='" + serviceName + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", creationTime=" + creationTime +
+        ", globalTxId='" + globalTxId + '\'' +
+        ", localTxId='" + localTxId + '\'' +
+        ", parentTxId='" + parentTxId + '\'' +
+        ", type='" + type + '\'' +
+        ", payloads=" + Arrays.toString(payloads) +
+        '}';
+  }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index ef07536..b2e837c 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -21,6 +21,7 @@
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
@@ -46,13 +47,16 @@ public class GrpcClientMessageSender implements MessageSender {
   private final GrpcCompensateStreamObserver compensateStreamObserver;
   private final GrpcServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+  public GrpcClientMessageSender(ManagedChannel channel,
+      MessageSerializer serializer,
+      MessageDeserializer deserializer,
+      ServiceConfig serviceConfig,
       MessageHandler handler) {
     this.asyncEventService = TxEventServiceGrpc.newStub(channel);
     this.blockingEventService = TxEventServiceGrpc.newBlockingStub(channel);
     this.serializer = serializer;
 
-    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler);
+    this.compensateStreamObserver = new GrpcCompensateStreamObserver(handler, deserializer);
     this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
   }
 
@@ -62,6 +66,11 @@ public class GrpcClientMessageSender implements MessageSender {
   }
 
   @Override
+  public void onDisconnected() {
+    blockingEventService.onDisconnected(serviceConfig);
+  }
+
+  @Override
   public void send(TxEvent event) {
     blockingEventService.onTxEvent(convertEvent(event));
   }
@@ -76,6 +85,7 @@ public class GrpcClientMessageSender implements MessageSender {
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setType(event.type())
+        .setCompensationMethod(event.compensationMethod())
         .setPayloads(payloads);
 
     if (event.parentTxId() != null) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 7f6b85c..7a845eb 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -22,6 +22,7 @@ package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import java.lang.invoke.MethodHandles;
 
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.slf4j.Logger;
@@ -34,9 +35,11 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MessageHandler messageHandler;
+  private final MessageDeserializer deserializer;
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+  GrpcCompensateStreamObserver(MessageHandler messageHandler, MessageDeserializer deserializer) {
     this.messageHandler = messageHandler;
+    this.deserializer = deserializer;
   }
 
   @Override
@@ -49,7 +52,7 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
         command.getLocalTxId(),
         command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
         command.getCompensateMethod(),
-        command.getPayloads());
+        deserializer.deserialize(command.getPayloads().toByteArray()));
   }
 
   @Override
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 460d314..9e0ebb6 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -47,6 +47,7 @@ class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final List<ManagedChannel> channels = new ArrayList<>();
+  private final List<MessageSender> senders = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -65,6 +66,7 @@ class OmegaSpringConfig {
 
   @PreDestroy
   void close() {
+    senders.forEach(MessageSender::onDisconnected);
     channels.forEach(ManagedChannel::shutdown);
   }
 
@@ -74,8 +76,9 @@ class OmegaSpringConfig {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
+        MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), new NativeMessageFormat(), serviceConfig, handler);
         sender.onConnected();
+        senders.add(sender);
         return sender;
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
index 8bc5260..d37c5b8 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSender.java
@@ -21,5 +21,8 @@ public interface MessageSender {
   default void onConnected() {
   }
 
+  default void onDisconnected() {
+  }
+
   void send(TxEvent event);
 }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 7ffb654..9522b6f 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -24,6 +24,7 @@ option java_outer_classname = "TxEventProto";
 service TxEventService {
   rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
   rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
+  rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck){}
 }
 
 message GrpcServiceConfig {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.