You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/28 06:43:12 UTC

[incubator-servicecomb-saga] branch master updated (1b1e184 -> 68382f2)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from 1b1e184  SCB-864 add SQL component content into API document
     new 236e0a8  SCB-817 Updated the TCC protocol for CoordinatedEvent
     new 80be074  SCB-817 Update the omega transactions code
     new 68382f2  SCB-818 Implements the TccEventServie of grpc

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 omega/omega-connector/omega-connector-grpc/pom.xml |   6 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   3 +-
 ...rver.java => GrpcCoordinateStreamObserver.java} |  37 +--
 .../omega/connector/grpc/GrpcTccEventService.java  | 156 +++++++++++++
 .../connector/grpc/GrpcTccEventServiceTest.java    | 260 +++++++++++++++++++++
 ...tService.java => CoordinateMessageHandler.java} |  30 +--
 .../transaction/{ => tcc}/MessageHandler.java      |   4 +-
 .../omega/transaction/tcc/TccEventService.java     |   8 +-
 .../tcc/TccStartAnnotationProcessor.java           |   6 +-
 .../{TccEndedEvent.java => CoordinatedEvent.java}  |  28 ++-
 .../tcc/CoordinateMessageHandlerTest.java          | 109 +++++++++
 .../transaction/tcc/TccParticipatorAspectTest.java |  11 +-
 .../tcc/TccStartAnnotationProcessorTest.java       |  10 +-
 .../omega/transaction/tcc/TccStartAspectTest.java  |   9 +-
 .../src/main/proto/GrpcTccEvent.proto              |  12 +
 pom.xml                                            |   5 +
 16 files changed, 624 insertions(+), 70 deletions(-)
 copy omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/{GrpcCompensateStreamObserver.java => GrpcCoordinateStreamObserver.java} (54%)
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/{TccEventService.java => CoordinateMessageHandler.java} (59%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{ => tcc}/MessageHandler.java (88%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/{TccEndedEvent.java => CoordinatedEvent.java} (77%)
 create mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java


[incubator-servicecomb-saga] 03/03: SCB-818 Implements the TccEventServie of grpc

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 68382f281cbb7c9b814e9ceb5ed75db4614a7c84
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Tue Aug 28 09:11:41 2018 +0800

    SCB-818 Implements the TccEventServie of grpc
---
 omega/omega-connector/omega-connector-grpc/pom.xml |   6 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   3 +-
 ...rver.java => GrpcCoordinateStreamObserver.java} |  37 +--
 .../omega/connector/grpc/GrpcTccEventService.java  | 156 +++++++++++++
 .../connector/grpc/GrpcTccEventServiceTest.java    | 260 +++++++++++++++++++++
 pom.xml                                            |   5 +
 6 files changed, 440 insertions(+), 27 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml
index 7a14097..f4d7bf1 100644
--- a/omega/omega-connector/omega-connector-grpc/pom.xml
+++ b/omega/omega-connector/omega-connector-grpc/pom.xml
@@ -18,7 +18,6 @@
   ~
   ~
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -59,6 +58,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 9d9c312..6aae96a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -59,11 +59,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
+    LOG.error("Failed to process grpc compensate command.", t);
     errorHandler.run();
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
similarity index 54%
copy from omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
copy to omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
index 9d9c312..20f5974 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,56 +13,44 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
- *
  */
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import java.lang.invoke.MethodHandles;
 
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
 
-class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+public class GrpcCoordinateStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final MessageHandler messageHandler;
-  private final Runnable errorHandler;
-  private final MessageDeserializer deserializer;
+  
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) {
+  public GrpcCoordinateStreamObserver(MessageHandler messageHandler) {
     this.messageHandler = messageHandler;
-    this.errorHandler = errorHandler;
-    this.deserializer = deserializer;
   }
 
   @Override
-  public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
-
-    messageHandler.onReceive(
-        command.getGlobalTxId(),
-        command.getLocalTxId(),
-        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensationMethod(),
-        deserializer.deserialize(command.getPayloads().toByteArray()));
+  public void onNext(GrpcTccCoordinateCommand command) {
+    LOG.info("Received coordinate command, global tx id: {}, local tx id: {}, call method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getMethod());
+    messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), command.getParentTxId(), command.getMethod());
   }
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
-    errorHandler.run();
+    //TODO need to find a way to handle the error
+    LOG.error("Failed to process grpc coordinate command.", t);
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
new file mode 100644
index 0000000..cfcb945
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+
+import io.grpc.ManagedChannel;
+
+public class GrpcTccEventService implements TccEventService {
+  private final GrpcServiceConfig serviceConfig;
+  private final String target;
+  private final TccEventServiceBlockingStub tccBlockingEventService;
+  private final TccEventServiceStub tccAsyncEventService;
+  private final GrpcCoordinateStreamObserver observer;
+
+  public GrpcTccEventService(ServiceConfig serviceConfig,
+      ManagedChannel channel,
+      String address,
+      MessageHandler handler
+      ) {
+    this.target = address;
+    tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
+    tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
+    this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
+    observer = new GrpcCoordinateStreamObserver(handler);
+  }
+
+  @Override
+  public void onConnected() {
+    tccAsyncEventService.onConnected(serviceConfig, observer);
+  }
+
+  @Override
+  public void onDisconnected() {
+    tccBlockingEventService.onDisconnected(serviceConfig);
+  }
+
+  @Override
+  public void close() {
+    // do nothing here
+  }
+
+  @Override
+  public String target() {
+    return target;
+  }
+
+  @Override
+  public AlphaResponse participate(ParticipatedEvent participateEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.participate(convertTo(participateEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+
+  @Override
+  public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+
+  }
+
+  @Override
+  public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) {
+    return GrpcTccCoordinatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(coordinatedEvent.getGlobalTxId())
+        .setLocalTxId(coordinatedEvent.getLocalTxId())
+        .setParentTxId(coordinatedEvent.getParentTxId())
+        .setMethodName(coordinatedEvent.getMethodName())
+        .setStatus(coordinatedEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcServiceConfig serviceConfig(String serviceName, String instanceId) {
+    return GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent tccStartEvent) {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccStartEvent.getGlobalTxId())
+        .setLocalTxId(tccStartEvent.getLocalTxId())
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccEndEvent.getGlobalTxId())
+        .setLocalTxId(tccEndEvent.getLocalTxId())
+        .setStatus(tccEndEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcTccParticipatedEvent convertTo(ParticipatedEvent participateEvent) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(participateEvent.getGlobalTxId())
+        .setLocalTxId(participateEvent.getLocalTxId())
+        .setParentTxId(participateEvent.getParentTxId())
+        .setCancelMethod(participateEvent.getCancelMethod())
+        .setConfirmMethod(participateEvent.getConfirmMethod())
+        .setStatus(participateEvent.getStatus().toString())
+        .build();
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
new file mode 100644
index 0000000..b4cfef2
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceImplBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+
+@RunWith(JUnit4.class)
+public class GrpcTccEventServiceTest {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+  private final GrpcAck ack = GrpcAck.newBuilder().setAborted(false).build();
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName = uniquify("methodName");
+  private final String confirmMethod = uniquify("confirmMethod");
+  private final String cancelMethod = uniquify("cancleMethod");
+  private final String serviceName = uniquify("serviceName");
+
+  private final ServiceConfig serviceConfig = new ServiceConfig(uniquify("Service"));
+  private final String address = uniquify("Address");
+  private final MessageHandler handler = mock(MessageHandler.class);
+  private GrpcTccEventService service;
+
+  @Before
+  public void setUp() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start, and register for automatic graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder.forName(serverName).
+        fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+    // Create a client channel and register for automatic graceful shutdown.
+    ManagedChannel channel = grpcCleanup.register(
+        InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+    // Create a TccEventServiceStub using the in-process channel;
+    service = new GrpcTccEventService(serviceConfig, channel, address, handler);
+  }
+
+  @Test
+  public void serviceOnDisconnectedTest() {
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onDisconnected();
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+  }
+
+
+  @Test
+  public void serviceOnConnectedTest() {
+    final GrpcTccCoordinateCommand coordinateCommand =
+        GrpcTccCoordinateCommand.newBuilder()
+            .setGlobalTxId(globalTxId)
+            .setLocalTxId(localTxId)
+            .setParentTxId(parentTxId)
+            .setMethod(methodName)
+            .setServiceName(serviceName)
+        .build();
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+      @Override
+      public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+        requestCaptor[0] = request;
+        // Just send the coordinateCommand back
+        responseObserver.onNext(coordinateCommand);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onConnected();
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+
+    verify(handler).onReceive(globalTxId, localTxId,parentTxId, methodName);
+
+  }
+
+  @Test
+  public void serviceOnTransactionStartTest() {
+
+    final GrpcTccTransactionStartedEvent[] requestCaptor = new GrpcTccTransactionStartedEvent[1];
+    TccStartedEvent event = new TccStartedEvent(globalTxId,localTxId);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStart(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnTransactionEndTest() {
+
+    final GrpcTccTransactionEndedEvent[] requestCaptor = new GrpcTccTransactionEndedEvent[1];
+    TccEndedEvent event = new TccEndedEvent(globalTxId,localTxId, TransactionStatus.Failed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStop(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Failed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnParticipateTest() {
+
+    final GrpcTccParticipatedEvent[] requestCaptor = new GrpcTccParticipatedEvent[1];
+    ParticipatedEvent event = new ParticipatedEvent(globalTxId,localTxId, parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void participate(GrpcTccParticipatedEvent request,
+          StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.participate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getParentTxId(), is(parentTxId));
+    assertThat(requestCaptor[0].getCancelMethod(), is(cancelMethod));
+    assertThat(requestCaptor[0].getConfirmMethod(), is(confirmMethod));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnCoordinateTest() {
+
+    final GrpcTccCoordinatedEvent[] requestCaptor = new GrpcTccCoordinatedEvent[1];
+    CoordinatedEvent event = new CoordinatedEvent(globalTxId,localTxId, parentTxId, methodName, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccCoordinated(GrpcTccCoordinatedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.coordinate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getMethodName(), is(methodName));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+
+
+}
diff --git a/pom.xml b/pom.xml
index 83e5366..57ab449 100644
--- a/pom.xml
+++ b/pom.xml
@@ -503,6 +503,11 @@
         <version>${grpc.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-testing</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.esotericsoftware</groupId>
         <artifactId>kryo</artifactId>
         <version>${kryo.version}</version>


[incubator-servicecomb-saga] 01/03: SCB-817 Updated the TCC protocol for CoordinatedEvent

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 236e0a82cf91c4e5ab8f4292365428a2f602ba69
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Mon Aug 27 15:46:49 2018 +0800

    SCB-817 Updated the TCC protocol for CoordinatedEvent
---
 .../{TccEventService.java => MessageHandler.java}  | 24 ++--------------------
 .../omega/transaction/tcc/TccEventService.java     |  8 +++++---
 .../transaction/tcc/events/CoordinatedEvent.java   |  4 ++++
 .../src/main/proto/GrpcTccEvent.proto              | 12 +++++++++++
 4 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
similarity index 55%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
index 48ad743..8758954 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
@@ -17,26 +17,6 @@
 
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
-import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
-
-public interface TccEventService {
-
-  void onConnected();
-
-  void onDisconnected();
-
-  void close();
-
-  String target();
-
-  AlphaResponse participate(ParticipatedEvent participateEvent);
-
-  AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
-
-  AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
-  
+public interface MessageHandler {
+  void onReceive(String globalTxId, String localTxId, String parentTxId, String method);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
index 48ad743..f5bdcfd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
@@ -18,7 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -35,8 +35,10 @@ public interface TccEventService {
 
   AlphaResponse participate(ParticipatedEvent participateEvent);
 
-  AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
+  AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent);
 
-  AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
+  AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent);
+
+  AlphaResponse coordinate(CoordinatedEvent coordinatedEvent);
   
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
new file mode 100644
index 0000000..6d88924
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class CoordinatedEvent {
+}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
index 64731b0..7977b7a 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
@@ -29,6 +29,7 @@ service TccEventService {
   rpc participate(GrpcTccParticipatedEvent) returns (GrpcAck) {}
   rpc OnTccTransactionStarted (GrpcTccTransactionStartedEvent) returns (GrpcAck) {}
   rpc OnTccTransactionEnded (GrpcTccTransactionEndedEvent) returns (GrpcAck) {}
+  rpc OnTccCoordinated(GrpcTccCoordinatedEvent) returns(GrpcAck) {}
   rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
   }
 }
@@ -64,6 +65,17 @@ message GrpcTccTransactionEndedEvent {
   string status = 7;
 }
 
+message GrpcTccCoordinatedEvent {
+  int64 timestamp = 1;
+  string globalTxId = 2;
+  string localTxId = 3;
+  string parentTxId = 4;
+  string serviceName = 5;
+  string instanceId = 6;
+  string methodName = 7;
+  string status = 8;
+}
+
 message GrpcTccCoordinateCommand {
   string globalTxId = 1;
   string localTxId = 2;


[incubator-servicecomb-saga] 02/03: SCB-817 Update the omega transactions code

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 80be074881d218fe2a18e7c1ffb45d021a9e2893
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Mon Aug 27 16:13:31 2018 +0800

    SCB-817 Update the omega transactions code
---
 .../transaction/tcc/CoordinateMessageHandler.java  |  36 +++++++
 .../tcc/TccStartAnnotationProcessor.java           |   6 +-
 .../transaction/tcc/events/CoordinatedEvent.java   |  53 ++++++++++
 .../tcc/CoordinateMessageHandlerTest.java          | 109 +++++++++++++++++++++
 .../transaction/tcc/TccParticipatorAspectTest.java |  11 ++-
 .../tcc/TccStartAnnotationProcessorTest.java       |  10 +-
 .../omega/transaction/tcc/TccStartAspectTest.java  |   9 +-
 7 files changed, 222 insertions(+), 12 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
new file mode 100644
index 0000000..7663e72
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+
+public class CoordinateMessageHandler implements MessageHandler {
+
+  private final TccEventService tccEventService;
+
+  public CoordinateMessageHandler(TccEventService tccEventService) {
+    this.tccEventService = tccEventService;
+  }
+
+  @Override
+  public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) {
+    //TODO Omega Call the service
+    tccEventService.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed));
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
index 55198dd..137a397 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -37,21 +37,21 @@ public class TccStartAnnotationProcessor {
 
   public AlphaResponse preIntercept(String parentTxId, String methodName, int timeout) {
     try {
-      return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+      return eventService.tccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
   }
 
   public void postIntercept(String parentTxId, String methodName) {
-    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+    eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
         TransactionStatus.Succeed));
   }
 
   public void onError(String parentTxId, String methodName, Throwable throwable) {
     // Send the cancel event
     // Do we need to wait for the alpha finish all the transaction
-    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+    eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
         TransactionStatus.Failed));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
index 6d88924..b770ff3 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
@@ -1,4 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.servicecomb.saga.omega.transaction.tcc.events;
 
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
 public class CoordinatedEvent {
+  private final String globalTxId;
+  private final String localTxId;
+  private final String parentTxId;
+  private final String methodName;
+  private final TransactionStatus status;
+
+  public CoordinatedEvent(String globalTxId, String localTxId, String parentTxId, String methodName,
+      TransactionStatus status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.methodName = methodName;
+    this.status = status;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public String getMethodName() {
+    return methodName;
+  }
+
+  public TransactionStatus getStatus() {
+    return status;
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
new file mode 100644
index 0000000..b260295
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CoordinateMessageHandlerTest {
+  private final List<CoordinatedEvent> coordinatedEvents = new ArrayList<>();
+  private final AlphaResponse response = new AlphaResponse(false);
+  private final TccEventService eventService = new TccEventService() {
+    @Override
+    public void onConnected() {
+
+    }
+
+    @Override
+    public void onDisconnected() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public String target() {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse participate(ParticipatedEvent participateEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      coordinatedEvents.add(coordinatedEvent);
+      return response;
+    }
+  };
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName= uniquify("Method");
+
+  private final CoordinateMessageHandler handler = new CoordinateMessageHandler(eventService);
+
+  @Before
+  public void setUp() {
+    coordinatedEvents.clear();
+  }
+
+  @Test
+  public void sendsCompensatedEventOnCompensationCompleted() {
+    handler.onReceive(globalTxId, localTxId, parentTxId, methodName);
+
+    assertThat(coordinatedEvents.size(), is(1));
+
+    CoordinatedEvent event = coordinatedEvents.get(0);
+    assertThat(event.getGlobalTxId(), is(globalTxId));
+    assertThat(event.getLocalTxId(), is(localTxId));
+    assertThat(event.getParentTxId(), is(parentTxId));
+    assertThat(event.getMethodName(), is(methodName));
+    assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+  }
+
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
index 8e6b36e..b3cd806 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -31,10 +31,9 @@ import java.util.UUID;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -81,15 +80,19 @@ public class TccParticipatorAspectTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       return null;
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       return null;
     }
 
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };
 
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
index 95f8137..fa34ad7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -79,7 +80,7 @@ public class TccStartAnnotationProcessorTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       if (throwException) {
         throw exception;
       }
@@ -88,12 +89,15 @@ public class TccStartAnnotationProcessorTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       endedEvents.add(tccEndEvent);
       return response;
     }
 
-
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };
   private final TccStartAnnotationProcessor tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context,
       eventService);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
index 005af67..80f14e4 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
@@ -32,6 +32,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -75,17 +76,21 @@ public class TccStartAspectTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       startedEvents.add(tccStartEvent);
       return response;
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       endedEvents.add(tccEndEvent);
       return response;
     }
 
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };