You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/28 01:13:29 UTC

[incubator-servicecomb-saga] branch SCB-817 updated (5e5aade -> 20e9afd)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch SCB-817
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    omit 5e5aade  SCB-817 Added the unit tests of TCC related Aspect
     add 6dee598  SCB-817 Added the unit tests of TCC related Aspect
     add 22082fe  SCB-866 extend SQLTransport type
     add e38d1b7  SCB-867 Add SQL type Jackson format
     add 95be096  SCB-867 Add test for SQL Jackson format
     add 9487ac2  [SCB-865] Refactoring the Omega Interceptors 1.Remove the inheritance hierarchy between SagaStartAnnotationProcessor and EventAwareInterceptor. 2.Remove the NO_OP_INTERCEPTOR from the interface EventAwareInterceptor. Turning the NoOp Implementation into a class of static singleton pattern.
     add 0f3ab0a  [SCB-865] Refactoring the Omega Interceptors 1. Removing parentId when dealing with error.
     add 314e0cc  SCB-865 Added the license header to NoOpEventAwareInterceptor
     add 4e3bb69  Expose alpha's gRPC port(8090)
     add 5f55399  Add the load test guide.
     add 8ff04bd  Update the k8s base/ folder description. Add the intro of jmeter-collector and the load test ref.
     new 588059d  SCB-817 Updated the TCC protocol for CoordinatedEvent
     new dbb7441  SCB-817 Update the omega transactions code
     new 20e9afd  SCB-818 Implements the TccEventServie of grpc

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5e5aade)
            \
             N -- N -- N   refs/heads/SCB-817 (20e9afd)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/load_test.md                                  |  49 ++++
 omega/omega-connector/omega-connector-grpc/pom.xml |   6 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   3 +-
 ...rver.java => GrpcCoordinateStreamObserver.java} |  37 +--
 .../omega/connector/grpc/GrpcTccEventService.java  | 156 +++++++++++++
 .../connector/grpc/GrpcTccEventServiceTest.java    | 260 +++++++++++++++++++++
 .../omega/transaction/EventAwareInterceptor.java   |  18 +-
 ...Factory.java => NoOpEventAwareInterceptor.java} |  27 ++-
 .../transaction/SagaStartAnnotationProcessor.java  |  23 +-
 .../saga/omega/transaction/SagaStartAspect.java    |   7 +-
 ...tService.java => CoordinateMessageHandler.java} |  30 +--
 .../transaction/{ => tcc}/MessageHandler.java      |   4 +-
 .../omega/transaction/tcc/TccEventService.java     |   8 +-
 .../tcc/TccStartAnnotationProcessor.java           |   6 +-
 .../{TccEndedEvent.java => CoordinatedEvent.java}  |  28 ++-
 .../SagaStartAnnotationProcessorTest.java          |  17 +-
 .../tcc/CoordinateMessageHandlerTest.java          | 109 +++++++++
 .../transaction/tcc/TccParticipatorAspectTest.java |  11 +-
 .../tcc/TccStartAnnotationProcessorTest.java       |  10 +-
 .../omega/transaction/tcc/TccStartAspectTest.java  |   9 +-
 .../src/main/proto/GrpcTccEvent.proto              |  12 +
 pom.xml                                            |   5 +
 .../apache/servicecomb/saga/core/Operation.java    |   1 +
 ...ccessfulSagaResponse.java => SQLOperation.java} |  39 +++-
 .../{RestTransport.java => SQLTransport.java}      |   8 +-
 saga-demo/saga-k8s-resources/README.md             |   2 +-
 saga-demo/saga-k8s-resources/base/alpha.yaml       |   4 +-
 ...mpensation.java => JacksonSQLCompensation.java} |  28 +--
 ...onSagaRequest.java => JacksonSQLOperation.java} |  43 ++--
 ...Transaction.java => JacksonSQLTransaction.java} |  17 +-
 ...estSagaRequest.java => JsonSQLSagaRequest.java} |  28 +--
 .../servicecomb/saga/format/JsonSagaRequest.java   |   3 +-
 .../format/JacksonFromJsonFormatForSQLTest.java    | 212 +++++++++++++++++
 ...equestTest.java => JsonSQLSagaRequestTest.java} |  43 +---
 34 files changed, 1031 insertions(+), 232 deletions(-)
 create mode 100644 docs/load_test.md
 copy omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/{GrpcCompensateStreamObserver.java => GrpcCoordinateStreamObserver.java} (54%)
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{RecoveryPolicyFactory.java => NoOpEventAwareInterceptor.java} (58%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/{TccEventService.java => CoordinateMessageHandler.java} (59%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/{ => tcc}/MessageHandler.java (88%)
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/{TccEndedEvent.java => CoordinatedEvent.java} (77%)
 create mode 100644 omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
 copy saga-core/src/main/java/org/apache/servicecomb/saga/core/{SuccessfulSagaResponse.java => SQLOperation.java} (57%)
 copy saga-core/src/main/java/org/apache/servicecomb/saga/transports/{RestTransport.java => SQLTransport.java} (84%)
 copy saga-format/src/main/java/org/apache/servicecomb/saga/format/{JacksonRestCompensation.java => JacksonSQLCompensation.java} (67%)
 copy saga-format/src/main/java/org/apache/servicecomb/saga/format/{JsonSagaRequest.java => JacksonSQLOperation.java} (56%)
 copy saga-format/src/main/java/org/apache/servicecomb/saga/format/{JacksonRestTransaction.java => JacksonSQLTransaction.java} (75%)
 copy saga-format/src/main/java/org/apache/servicecomb/saga/format/{JsonRestSagaRequest.java => JsonSQLSagaRequest.java} (73%)
 create mode 100644 saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonFromJsonFormatForSQLTest.java
 copy saga-format/src/test/java/org/apache/servicecomb/saga/format/{JsonRestSagaRequestTest.java => JsonSQLSagaRequestTest.java} (59%)


[incubator-servicecomb-saga] 01/03: SCB-817 Updated the TCC protocol for CoordinatedEvent

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-817
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 588059d6ae6bc0c2106034bb49ea0d3b5a6a24e3
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Mon Aug 27 15:46:49 2018 +0800

    SCB-817 Updated the TCC protocol for CoordinatedEvent
---
 .../{TccEventService.java => MessageHandler.java}  | 24 ++--------------------
 .../omega/transaction/tcc/TccEventService.java     |  8 +++++---
 .../transaction/tcc/events/CoordinatedEvent.java   |  4 ++++
 .../src/main/proto/GrpcTccEvent.proto              | 12 +++++++++++
 4 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
similarity index 55%
copy from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
copy to omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
index 48ad743..8758954 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/MessageHandler.java
@@ -17,26 +17,6 @@
 
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
-import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
-import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
-
-public interface TccEventService {
-
-  void onConnected();
-
-  void onDisconnected();
-
-  void close();
-
-  String target();
-
-  AlphaResponse participate(ParticipatedEvent participateEvent);
-
-  AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
-
-  AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
-  
+public interface MessageHandler {
+  void onReceive(String globalTxId, String localTxId, String parentTxId, String method);
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
index 48ad743..f5bdcfd 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
@@ -18,7 +18,7 @@
 package org.apache.servicecomb.saga.omega.transaction.tcc;
 
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -35,8 +35,10 @@ public interface TccEventService {
 
   AlphaResponse participate(ParticipatedEvent participateEvent);
 
-  AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
+  AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent);
 
-  AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
+  AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent);
+
+  AlphaResponse coordinate(CoordinatedEvent coordinatedEvent);
   
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
new file mode 100644
index 0000000..6d88924
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
@@ -0,0 +1,4 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class CoordinatedEvent {
+}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
index 64731b0..7977b7a 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
@@ -29,6 +29,7 @@ service TccEventService {
   rpc participate(GrpcTccParticipatedEvent) returns (GrpcAck) {}
   rpc OnTccTransactionStarted (GrpcTccTransactionStartedEvent) returns (GrpcAck) {}
   rpc OnTccTransactionEnded (GrpcTccTransactionEndedEvent) returns (GrpcAck) {}
+  rpc OnTccCoordinated(GrpcTccCoordinatedEvent) returns(GrpcAck) {}
   rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
   }
 }
@@ -64,6 +65,17 @@ message GrpcTccTransactionEndedEvent {
   string status = 7;
 }
 
+message GrpcTccCoordinatedEvent {
+  int64 timestamp = 1;
+  string globalTxId = 2;
+  string localTxId = 3;
+  string parentTxId = 4;
+  string serviceName = 5;
+  string instanceId = 6;
+  string methodName = 7;
+  string status = 8;
+}
+
 message GrpcTccCoordinateCommand {
   string globalTxId = 1;
   string localTxId = 2;


[incubator-servicecomb-saga] 03/03: SCB-818 Implements the TccEventServie of grpc

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-817
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 20e9afd0440d346d79d9c7a64595c193f40044ba
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Tue Aug 28 09:11:41 2018 +0800

    SCB-818 Implements the TccEventServie of grpc
---
 omega/omega-connector/omega-connector-grpc/pom.xml |   6 +-
 .../grpc/GrpcCompensateStreamObserver.java         |   3 +-
 ...rver.java => GrpcCoordinateStreamObserver.java} |  37 +--
 .../omega/connector/grpc/GrpcTccEventService.java  | 156 +++++++++++++
 .../connector/grpc/GrpcTccEventServiceTest.java    | 260 +++++++++++++++++++++
 pom.xml                                            |   5 +
 6 files changed, 440 insertions(+), 27 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml
index 7a14097..f4d7bf1 100644
--- a/omega/omega-connector/omega-connector-grpc/pom.xml
+++ b/omega/omega-connector/omega-connector-grpc/pom.xml
@@ -18,7 +18,6 @@
   ~
   ~
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -59,6 +58,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 9d9c312..6aae96a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -59,11 +59,12 @@ class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateComma
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
+    LOG.error("Failed to process grpc compensate command.", t);
     errorHandler.run();
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
similarity index 54%
copy from omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
copy to omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
index 9d9c312..20f5974 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCoordinateStreamObserver.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,56 +13,44 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
- *
  */
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
 import java.lang.invoke.MethodHandles;
 
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
 
-class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+public class GrpcCoordinateStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final MessageHandler messageHandler;
-  private final Runnable errorHandler;
-  private final MessageDeserializer deserializer;
+  
 
-  GrpcCompensateStreamObserver(MessageHandler messageHandler, Runnable errorHandler, MessageDeserializer deserializer) {
+  public GrpcCoordinateStreamObserver(MessageHandler messageHandler) {
     this.messageHandler = messageHandler;
-    this.errorHandler = errorHandler;
-    this.deserializer = deserializer;
   }
 
   @Override
-  public void onNext(GrpcCompensateCommand command) {
-    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensation method: {}",
-        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensationMethod());
-
-    messageHandler.onReceive(
-        command.getGlobalTxId(),
-        command.getLocalTxId(),
-        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
-        command.getCompensationMethod(),
-        deserializer.deserialize(command.getPayloads().toByteArray()));
+  public void onNext(GrpcTccCoordinateCommand command) {
+    LOG.info("Received coordinate command, global tx id: {}, local tx id: {}, call method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getMethod());
+    messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), command.getParentTxId(), command.getMethod());
   }
 
   @Override
   public void onError(Throwable t) {
-    LOG.error("failed to process grpc compensate command.", t);
-    errorHandler.run();
+    //TODO need to find a way to handle the error
+    LOG.error("Failed to process grpc coordinate command.", t);
   }
 
   @Override
   public void onCompleted() {
+    // Do nothing here
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
new file mode 100644
index 0000000..cfcb945
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventService.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+
+import io.grpc.ManagedChannel;
+
+public class GrpcTccEventService implements TccEventService {
+  private final GrpcServiceConfig serviceConfig;
+  private final String target;
+  private final TccEventServiceBlockingStub tccBlockingEventService;
+  private final TccEventServiceStub tccAsyncEventService;
+  private final GrpcCoordinateStreamObserver observer;
+
+  public GrpcTccEventService(ServiceConfig serviceConfig,
+      ManagedChannel channel,
+      String address,
+      MessageHandler handler
+      ) {
+    this.target = address;
+    tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
+    tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
+    this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
+    observer = new GrpcCoordinateStreamObserver(handler);
+  }
+
+  @Override
+  public void onConnected() {
+    tccAsyncEventService.onConnected(serviceConfig, observer);
+  }
+
+  @Override
+  public void onDisconnected() {
+    tccBlockingEventService.onDisconnected(serviceConfig);
+  }
+
+  @Override
+  public void close() {
+    // do nothing here
+  }
+
+  @Override
+  public String target() {
+    return target;
+  }
+
+  @Override
+  public AlphaResponse participate(ParticipatedEvent participateEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.participate(convertTo(participateEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  @Override
+  public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+
+  @Override
+  public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+
+  }
+
+  @Override
+  public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+    GrpcAck grpcAck = tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
+    return new AlphaResponse(grpcAck.getAborted());
+  }
+
+  private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) {
+    return GrpcTccCoordinatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(coordinatedEvent.getGlobalTxId())
+        .setLocalTxId(coordinatedEvent.getLocalTxId())
+        .setParentTxId(coordinatedEvent.getParentTxId())
+        .setMethodName(coordinatedEvent.getMethodName())
+        .setStatus(coordinatedEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcServiceConfig serviceConfig(String serviceName, String instanceId) {
+    return GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent tccStartEvent) {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccStartEvent.getGlobalTxId())
+        .setLocalTxId(tccStartEvent.getLocalTxId())
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(tccEndEvent.getGlobalTxId())
+        .setLocalTxId(tccEndEvent.getLocalTxId())
+        .setStatus(tccEndEvent.getStatus().toString())
+        .build();
+  }
+
+  private GrpcTccParticipatedEvent convertTo(ParticipatedEvent participateEvent) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setServiceName(serviceConfig.getServiceName())
+        .setInstanceId(serviceConfig.getInstanceId())
+        .setGlobalTxId(participateEvent.getGlobalTxId())
+        .setLocalTxId(participateEvent.getLocalTxId())
+        .setParentTxId(participateEvent.getParentTxId())
+        .setCancelMethod(participateEvent.getCancelMethod())
+        .setConfirmMethod(participateEvent.getConfirmMethod())
+        .setStatus(participateEvent.getStatus().toString())
+        .build();
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
new file mode 100644
index 0000000..b4cfef2
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcTccEventServiceTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceImplBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+
+@RunWith(JUnit4.class)
+public class GrpcTccEventServiceTest {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+  private final GrpcAck ack = GrpcAck.newBuilder().setAborted(false).build();
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName = uniquify("methodName");
+  private final String confirmMethod = uniquify("confirmMethod");
+  private final String cancelMethod = uniquify("cancleMethod");
+  private final String serviceName = uniquify("serviceName");
+
+  private final ServiceConfig serviceConfig = new ServiceConfig(uniquify("Service"));
+  private final String address = uniquify("Address");
+  private final MessageHandler handler = mock(MessageHandler.class);
+  private GrpcTccEventService service;
+
+  @Before
+  public void setUp() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start, and register for automatic graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder.forName(serverName).
+        fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+    // Create a client channel and register for automatic graceful shutdown.
+    ManagedChannel channel = grpcCleanup.register(
+        InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+    // Create a TccEventServiceStub using the in-process channel;
+    service = new GrpcTccEventService(serviceConfig, channel, address, handler);
+  }
+
+  @Test
+  public void serviceOnDisconnectedTest() {
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onDisconnected();
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+  }
+
+
+  @Test
+  public void serviceOnConnectedTest() {
+    final GrpcTccCoordinateCommand coordinateCommand =
+        GrpcTccCoordinateCommand.newBuilder()
+            .setGlobalTxId(globalTxId)
+            .setLocalTxId(localTxId)
+            .setParentTxId(parentTxId)
+            .setMethod(methodName)
+            .setServiceName(serviceName)
+        .build();
+
+    final GrpcServiceConfig[] requestCaptor = new GrpcServiceConfig[1];
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+      @Override
+      public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+        requestCaptor[0] = request;
+        // Just send the coordinateCommand back
+        responseObserver.onNext(coordinateCommand);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    service.onConnected();
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+
+    verify(handler).onReceive(globalTxId, localTxId,parentTxId, methodName);
+
+  }
+
+  @Test
+  public void serviceOnTransactionStartTest() {
+
+    final GrpcTccTransactionStartedEvent[] requestCaptor = new GrpcTccTransactionStartedEvent[1];
+    TccStartedEvent event = new TccStartedEvent(globalTxId,localTxId);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStart(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnTransactionEndTest() {
+
+    final GrpcTccTransactionEndedEvent[] requestCaptor = new GrpcTccTransactionEndedEvent[1];
+    TccEndedEvent event = new TccEndedEvent(globalTxId,localTxId, TransactionStatus.Failed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.tccTransactionStop(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Failed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnParticipateTest() {
+
+    final GrpcTccParticipatedEvent[] requestCaptor = new GrpcTccParticipatedEvent[1];
+    ParticipatedEvent event = new ParticipatedEvent(globalTxId,localTxId, parentTxId, confirmMethod, cancelMethod, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void participate(GrpcTccParticipatedEvent request,
+          StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.participate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getParentTxId(), is(parentTxId));
+    assertThat(requestCaptor[0].getCancelMethod(), is(cancelMethod));
+    assertThat(requestCaptor[0].getConfirmMethod(), is(confirmMethod));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+  @Test
+  public void serviceOnCoordinateTest() {
+
+    final GrpcTccCoordinatedEvent[] requestCaptor = new GrpcTccCoordinatedEvent[1];
+    CoordinatedEvent event = new CoordinatedEvent(globalTxId,localTxId, parentTxId, methodName, TransactionStatus.Succeed);
+
+    TccEventServiceImplBase serviceImpl = new TccEventServiceImplBase() {
+
+      public void onTccCoordinated(GrpcTccCoordinatedEvent request,
+          io.grpc.stub.StreamObserver<GrpcAck> responseObserver) {
+        requestCaptor[0] = request;
+        responseObserver.onNext(ack);
+        responseObserver.onCompleted();
+      }
+    };
+
+    serviceRegistry.addService(serviceImpl);
+    AlphaResponse response =service.coordinate(event);
+
+    assertThat(requestCaptor[0].getServiceName(), is(serviceConfig.serviceName()));
+    assertThat(requestCaptor[0].getInstanceId(), is(serviceConfig.instanceId()));
+    assertThat(requestCaptor[0].getGlobalTxId(), is(globalTxId));
+    assertThat(requestCaptor[0].getLocalTxId(), is(localTxId));
+    assertThat(requestCaptor[0].getMethodName(), is(methodName));
+    assertThat(requestCaptor[0].getStatus(), is(TransactionStatus.Succeed.toString()));
+    assertThat(response.aborted(), is(false));
+  }
+
+
+
+}
diff --git a/pom.xml b/pom.xml
index 83e5366..57ab449 100644
--- a/pom.xml
+++ b/pom.xml
@@ -503,6 +503,11 @@
         <version>${grpc.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-testing</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+      <dependency>
         <groupId>com.esotericsoftware</groupId>
         <artifactId>kryo</artifactId>
         <version>${kryo.version}</version>


[incubator-servicecomb-saga] 02/03: SCB-817 Update the omega transactions code

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch SCB-817
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit dbb74417364e10e9d08d53d68279a61a163a4fa1
Author: Willem Jiang <ji...@huawei.com>
AuthorDate: Mon Aug 27 16:13:31 2018 +0800

    SCB-817 Update the omega transactions code
---
 .../transaction/tcc/CoordinateMessageHandler.java  |  36 +++++++
 .../tcc/TccStartAnnotationProcessor.java           |   6 +-
 .../transaction/tcc/events/CoordinatedEvent.java   |  53 ++++++++++
 .../tcc/CoordinateMessageHandlerTest.java          | 109 +++++++++++++++++++++
 .../transaction/tcc/TccParticipatorAspectTest.java |  11 ++-
 .../tcc/TccStartAnnotationProcessorTest.java       |  10 +-
 .../omega/transaction/tcc/TccStartAspectTest.java  |   9 +-
 7 files changed, 222 insertions(+), 12 deletions(-)

diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
new file mode 100644
index 0000000..7663e72
--- /dev/null
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+
+public class CoordinateMessageHandler implements MessageHandler {
+
+  private final TccEventService tccEventService;
+
+  public CoordinateMessageHandler(TccEventService tccEventService) {
+    this.tccEventService = tccEventService;
+  }
+
+  @Override
+  public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) {
+    //TODO Omega Call the service
+    tccEventService.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed));
+  }
+}
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
index 55198dd..137a397 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -37,21 +37,21 @@ public class TccStartAnnotationProcessor {
 
   public AlphaResponse preIntercept(String parentTxId, String methodName, int timeout) {
     try {
-      return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+      return eventService.tccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
     } catch (OmegaException e) {
       throw new TransactionalException(e.getMessage(), e.getCause());
     }
   }
 
   public void postIntercept(String parentTxId, String methodName) {
-    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+    eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
         TransactionStatus.Succeed));
   }
 
   public void onError(String parentTxId, String methodName, Throwable throwable) {
     // Send the cancel event
     // Do we need to wait for the alpha finish all the transaction
-    eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+    eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
         TransactionStatus.Failed));
   }
 }
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
index 6d88924..b770ff3 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java
@@ -1,4 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.servicecomb.saga.omega.transaction.tcc.events;
 
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
 public class CoordinatedEvent {
+  private final String globalTxId;
+  private final String localTxId;
+  private final String parentTxId;
+  private final String methodName;
+  private final TransactionStatus status;
+
+  public CoordinatedEvent(String globalTxId, String localTxId, String parentTxId, String methodName,
+      TransactionStatus status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.methodName = methodName;
+    this.status = status;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public String getMethodName() {
+    return methodName;
+  }
+
+  public TransactionStatus getStatus() {
+    return status;
+  }
 }
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
new file mode 100644
index 0000000..b260295
--- /dev/null
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CoordinateMessageHandlerTest {
+  private final List<CoordinatedEvent> coordinatedEvents = new ArrayList<>();
+  private final AlphaResponse response = new AlphaResponse(false);
+  private final TccEventService eventService = new TccEventService() {
+    @Override
+    public void onConnected() {
+
+    }
+
+    @Override
+    public void onDisconnected() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public String target() {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse participate(ParticipatedEvent participateEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
+      return null;
+    }
+
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      coordinatedEvents.add(coordinatedEvent);
+      return response;
+    }
+  };
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String methodName= uniquify("Method");
+
+  private final CoordinateMessageHandler handler = new CoordinateMessageHandler(eventService);
+
+  @Before
+  public void setUp() {
+    coordinatedEvents.clear();
+  }
+
+  @Test
+  public void sendsCompensatedEventOnCompensationCompleted() {
+    handler.onReceive(globalTxId, localTxId, parentTxId, methodName);
+
+    assertThat(coordinatedEvents.size(), is(1));
+
+    CoordinatedEvent event = coordinatedEvents.get(0);
+    assertThat(event.getGlobalTxId(), is(globalTxId));
+    assertThat(event.getLocalTxId(), is(localTxId));
+    assertThat(event.getParentTxId(), is(parentTxId));
+    assertThat(event.getMethodName(), is(methodName));
+    assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+  }
+
+}
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
index 8e6b36e..b3cd806 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java
@@ -31,10 +31,9 @@ import java.util.UUID;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
-import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
 import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -81,15 +80,19 @@ public class TccParticipatorAspectTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       return null;
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       return null;
     }
 
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };
 
 
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
index 95f8137..fa34ad7 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -79,7 +80,7 @@ public class TccStartAnnotationProcessorTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       if (throwException) {
         throw exception;
       }
@@ -88,12 +89,15 @@ public class TccStartAnnotationProcessorTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       endedEvents.add(tccEndEvent);
       return response;
     }
 
-
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };
   private final TccStartAnnotationProcessor tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context,
       eventService);
diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
index 005af67..80f14e4 100644
--- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
+++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java
@@ -32,6 +32,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
 import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
@@ -75,17 +76,21 @@ public class TccStartAspectTest {
     }
 
     @Override
-    public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) {
+    public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
       startedEvents.add(tccStartEvent);
       return response;
     }
 
     @Override
-    public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) {
+    public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
       endedEvents.add(tccEndEvent);
       return response;
     }
 
+    @Override
+    public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
+      return null;
+    }
   };