You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/28 06:40:14 UTC

[incubator-servicecomb-saga] branch master updated (8ff04bd -> 697dcc3)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from 8ff04bd  Update the k8s base/ folder description. Add the intro of jmeter-collector and the load test ref.
     new 819f19a  SCB-856 Implement alpha TCC workflow based on local memory.
     new 46f0f3b  SCB-856 Add junit test framework code for alpha TCC workflow.
     new 2064d86  SCB-856 Revise GrpcTccCordinateCommand => GrpcTccCoordinateCommand.
     new 4781bc1  SCB-856 Revise ParticipateEvent => ParticipatedEvent.
     new a7d2b08  SCB-856 Remove author info from class header.
     new e9455e0  SCB-856 Integrate alpha saga and tcc within same boot which are different with port.
     new b8a691b  SCB-856 Refactor alpha tcc test framework with integrated server.
     new 3f73776  SCB-856 Add junit test case & resolve related bugs.
     new 428f877  SCB-856 Refactor TCC alpha server using the same grpc port of saga.
     new 91ddb85  SCB-856 Refactor TCC alpha server with TccCallbackEngine.
     new 96d09dc  SCB-856 Refactor TCC alpha server package.
     new 3126512  SCB-856 Add exception junit test case for TCC alpha server.
     new 3875184  SCB-856 Add info log for alpha GrpcTccEventService.
     new 697dcc3  SCB-856 Revise alpha TCC server test case.

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  35 ++-
 .../saga/alpha/server/GrpcServerConfig.java        |   2 +-
 .../saga/alpha/server/GrpcStartable.java           |   4 +-
 .../saga/alpha/server/ServerStartable.java         |   2 +-
 .../saga/alpha/server/tcc/GrpcTccEventService.java |  92 +++++++
 .../alpha/server/tcc/callback/CallbackEngine.java  |  25 ++
 .../server/tcc/callback/GrpcOmegaTccCallback.java  |  53 +++++
 .../alpha/server/tcc/callback/OmegaCallback.java   |  29 +++
 .../server/tcc/callback/OmegaCallbackWrapper.java  |  36 +++
 .../server/tcc/callback/TccCallbackEngine.java     |  62 +++++
 .../server/tcc/event/ParticipateEventFactory.java  |  38 ++-
 .../alpha/server/tcc/event}/ParticipatedEvent.java |  31 ++-
 .../tcc/registry/OmegaCallbacksRegistry.java       |  96 ++++++++
 .../tcc/registry/TransactionEventRegistry.java     |  64 +++++
 .../saga/alpha/server/AlphaIntegrationTest.java    |  16 +-
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  | 263 +++++++++++++++++++++
 .../server/TccCoordinateCommandStreamObserver.java |  56 +++++
 17 files changed, 836 insertions(+), 68 deletions(-)
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
 copy omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java => alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java (59%)
 copy {omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events => alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event}/ParticipatedEvent.java (71%)
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java


[incubator-servicecomb-saga] 05/14: SCB-856 Remove author info from class header.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit a7d2b0880092970137294bc77fbb7de050311435
Author: cherrylzhao <zh...@126.com>
AuthorDate: Thu Aug 23 11:41:33 2018 +0800

    SCB-856 Remove author info from class header.
---
 .../servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java      | 2 --
 .../servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java       | 2 --
 .../servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java    | 4 +---
 .../servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java  | 2 --
 .../saga/alpha/tcc/server/event/ParticipateEventFactory.java         | 5 -----
 .../servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java   | 5 -----
 6 files changed, 1 insertion(+), 19 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
index e59aaa3..205c6db 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -24,8 +24,6 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 /**
  * Grpc omega callback for TCC workflow.
- *
- * @author zhaojun
  */
 public final class GrpcOmegaTccCallback implements OmegaCallback {
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
index b82d002..aa71f79 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -30,8 +30,6 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 
 /**
  * Grpc TCC event service implement.
- *
- * @author zhaojun
  */
 public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase {
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
index 92733e8..fbac514 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
@@ -26,9 +26,7 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 /**
- * Manage Omega callbacks.
- *
- * @author zhaojun
+ * Manage omega callbacks.
  */
 public final class OmegaCallbacksRegistry {
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
index 511db74..3c74b10 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
@@ -25,8 +25,6 @@ import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
 
 /**
  * Manage TCC transaction event.
- *
- * @author zhaojun
  */
 public final class TransactionEventRegistry {
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
index 5a4f325..4403e5b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
@@ -20,11 +20,6 @@ package org.apache.servicecomb.saga.alpha.tcc.server.event;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
 
-/**
- * Participate event factory.
- *
- * @author zhaojun
- */
 public class ParticipateEventFactory {
 
   public static ParticipatedEvent create(GrpcTccParticipatedEvent request) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
index 2c5740b..70038c6 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
@@ -19,11 +19,6 @@ package org.apache.servicecomb.saga.alpha.tcc.server.event;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
-/**
- * Participate event.
- *
- * @author zhaojun
- */
 public class ParticipatedEvent {
 
   private String globalTxId;


[incubator-servicecomb-saga] 10/14: SCB-856 Refactor TCC alpha server with TccCallbackEngine.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 91ddb85d06acde1ddd91cb50c4fa47e98b31bc2a
Author: cherrylzhao <zh...@126.com>
AuthorDate: Mon Aug 27 14:31:42 2018 +0800

    SCB-856 Refactor TCC alpha server with TccCallbackEngine.
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 17 +++---
 .../{OmegaCallback.java => CallbackEngine.java}    |  9 ++--
 .../alpha/server/tcc/GrpcOmegaTccCallback.java     |  4 +-
 .../saga/alpha/server/tcc/GrpcTccEventService.java | 22 ++++----
 .../saga/alpha/server/tcc/OmegaCallback.java       |  3 +-
 ...megaCallback.java => OmegaCallbackWrapper.java} | 15 ++++--
 .../alpha/server/tcc/OmegaCallbacksRegistry.java   | 11 ++++
 .../saga/alpha/server/tcc/TccCallbackEngine.java   | 61 ++++++++++++++++++++++
 8 files changed, 108 insertions(+), 34 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 8d9af1e..91143f4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,6 +35,8 @@ import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
+import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbackWrapper;
+import org.apache.servicecomb.saga.alpha.server.tcc.TccCallbackEngine;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
@@ -90,14 +92,11 @@ class AlphaConfig {
       CommandRepository commandRepository,
       TxTimeoutRepository timeoutRepository,
       OmegaCallback omegaCallback) {
-
-    new EventScanner(scheduler,
-        eventRepository, commandRepository, timeoutRepository,
-        omegaCallback, eventPollingInterval).run();
-
-    TxConsistentService consistentService = new TxConsistentService(eventRepository);
-
-    return consistentService;
+        new EventScanner(scheduler,
+            eventRepository, commandRepository, timeoutRepository,
+            omegaCallback, eventPollingInterval).run();
+        TxConsistentService consistentService = new TxConsistentService(eventRepository);
+        return consistentService;
   }
 
   @Bean
@@ -105,7 +104,7 @@ class AlphaConfig {
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks),
-        new GrpcTccEventService());
+        new GrpcTccEventService(new TccCallbackEngine(new OmegaCallbackWrapper())));
     new Thread(bootstrap::start).start();
     return bootstrap;
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
similarity index 81%
copy from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
copy to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
index 369472c..e69eb6a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
@@ -17,12 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 
-public interface OmegaCallback {
+public interface CallbackEngine {
 
-  void invoke(ParticipatedEvent event, String status);
-
-  default void disconnect() {
-  }
+   boolean execute(GrpcTccTransactionEndedEvent request);
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index 8ea7cfb..e1fb68a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -34,13 +34,13 @@ public final class GrpcOmegaTccCallback implements OmegaCallback {
   }
 
   @Override
-  public void invoke(ParticipatedEvent event, String status) {
+  public void invoke(ParticipatedEvent event, TransactionStatus status) {
     GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
         .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
         .setServiceName(event.getServiceName())
-        .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+        .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
         .build();
     responseObserver.onNext(command);
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index ef9adbe..aa20f77 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,9 +18,7 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -35,8 +33,16 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase {
 
   private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
+  
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
+  private final TccCallbackEngine tccCallbackEngine;
+
+  public GrpcTccEventService(
+      TccCallbackEngine tccCallbackEngine) {
+    this.tccCallbackEngine = tccCallbackEngine;
+  }
+
   @Override
   public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     OmegaCallbacksRegistry.register(request, responseObserver);
@@ -57,18 +63,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
-    try {
-      for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
-        OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId())
-            .invoke(event, request.getStatus());
-      }
-    } catch (AlphaException ex) {
-      responseObserver.onNext(REJECT);
-    }
-    responseObserver.onNext(ALLOW);
+    responseObserver.onNext(tccCallbackEngine.execute(request) ? ALLOW : REJECT);
     responseObserver.onCompleted();
   }
-
+  
   @Override
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
     OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId());
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 369472c..41a6c06 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -18,10 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
 
-  void invoke(ParticipatedEvent event, String status);
+  void invoke(ParticipatedEvent event, TransactionStatus status);
 
   default void disconnect() {
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
similarity index 65%
copy from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
copy to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
index 369472c..3b55ffa 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
@@ -18,11 +18,18 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 
-public interface OmegaCallback {
+public class OmegaCallbackWrapper implements OmegaCallback {
 
-  void invoke(ParticipatedEvent event, String status);
-
-  default void disconnect() {
+  @Override
+  public void invoke(ParticipatedEvent event, TransactionStatus status) {
+    OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId());
+    try {
+      omegaCallback.invoke(event, status);
+    } catch (Exception ex) {
+      OmegaCallbacksRegistry.removeByValue(event.getServiceName(), omegaCallback);
+      throw ex;
+    }
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index 834a5a2..9e228da 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -20,17 +20,22 @@ package org.apache.servicecomb.saga.alpha.server.tcc;
 import static java.util.Collections.emptyMap;
 
 import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manage omega callbacks.
  */
 public final class OmegaCallbacksRegistry {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
 
   public static Map<String, Map<String, OmegaCallback>> getRegistry() {
@@ -64,11 +69,17 @@ public final class OmegaCallbacksRegistry {
     }
     OmegaCallback result = callbackMap.get(instanceId);
     if (null == result) {
+      LOG.info("Cannot find the service with the instanceId {}, call the other instance.", instanceId);
       return callbackMap.values().iterator().next();
     }
     return result;
   }
 
+  public static void removeByValue(String serviceName, OmegaCallback omegaCallback) {
+    Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap());
+    callbackMap.values().remove(omegaCallback);
+  }
+
   /**
    * Retrieve omega TCC callback by service name and instance id, then remove it from registry.
    *
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
new file mode 100644
index 0000000..f9aae88
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccCallbackEngine implements CallbackEngine {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OmegaCallbackWrapper omegaCallbackWrapper;
+
+  public TccCallbackEngine(OmegaCallbackWrapper omegaCallbackWrapper) {
+    this.omegaCallbackWrapper = omegaCallbackWrapper;
+  }
+
+  @Override
+  public boolean execute(GrpcTccTransactionEndedEvent request) {
+    boolean result = true;
+    for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+      try {
+        omegaCallbackWrapper.invoke(event, TransactionStatus.valueOf(request.getStatus()));
+      } catch (Exception ex) {
+        logError(event, ex);
+        result = false;
+      }
+    }
+    return result;
+  }
+
+  private void logError(ParticipatedEvent event, Exception ex) {
+    LOG.error(
+        "Failed to invoke service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
+        event.getServiceName(),
+        event.getInstanceId(),
+        TransactionStatus.Succeed.equals(event.getStatus()) ? event.getConfirmMethod() : event.getCancelMethod(),
+        event.getGlobalTxId(),
+        event.getLocalTxId(),
+        ex);
+  }
+}


[incubator-servicecomb-saga] 07/14: SCB-856 Refactor alpha tcc test framework with integrated server.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b8a691bb54d31124bdc555bc515f592603191a82
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Aug 24 10:44:49 2018 +0800

    SCB-856 Refactor alpha tcc test framework with integrated server.
---
 .../saga/alpha/server/tcc/AlphaTccConfig.java      |   2 -
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  |  29 ++----
 .../tcc/server/common/AlphaTccApplication.java     |  28 ------
 .../saga/alpha/tcc/server/common/Bootstrap.java    |  23 -----
 .../alpha/tcc/server/common/GrpcBootstrap.java     | 109 ---------------------
 .../tcc/server/common/GrpcTccServerConfig.java     |  75 --------------
 6 files changed, 6 insertions(+), 260 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
index 8dabf79..97b35ee 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
@@ -17,7 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
 import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
 import org.apache.servicecomb.saga.alpha.server.ServerStartable;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -27,7 +26,6 @@ import org.springframework.context.annotation.Configuration;
 
 @Configuration
 @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
-//@ConditionalOnProperty(value ="alpha.mode.TCC")
 @ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
 public class AlphaTccConfig {
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 517729a..d421075 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -24,12 +24,7 @@ import io.grpc.netty.NettyChannelBuilder;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import javax.annotation.PostConstruct;
-import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.AlphaTccApplication;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.Bootstrap;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcBootstrap;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcTccServerConfig;
+import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
 import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -43,32 +38,20 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaTccApplication.class},
+@SpringBootTest(classes = {AlphaApplication.class},
     properties = {
         "alpha.server.host=0.0.0.0",
-        "alpha.server.port=8098",
-        "alpha.event.pollingInterval=1"
+        "alpha.server.tcc-port=8090",
+        "alpha.event.pollingInterval=1",
+        "alpha.mode=TCC"
     })
 public class AlphaTccServerTest {
 
-  @Autowired
-  private GrpcTccServerConfig grpcTccServerConfig;
-
-  private static GrpcTccServerConfig serverConfig;
-  @PostConstruct
-  public void init() {
-    serverConfig = grpcTccServerConfig;
-    server = new GrpcBootstrap(serverConfig, new GrpcTccEventService());
-    new Thread(server::start).start();
-  }
-
   private static final int port = 8090;
-  private  static Bootstrap server;
   protected static ManagedChannel clientChannel;
 
   private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
@@ -116,7 +99,7 @@ public class AlphaTccServerTest {
 
   @Test
   public void assertOnConnect() {
-//    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
   }
 
   private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
deleted file mode 100644
index 464dffc..0000000
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class AlphaTccApplication {
-  public static void main(String[] args) {
-    SpringApplication.run(AlphaTccApplication.class, args);
-  }
-}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
deleted file mode 100644
index 6382378..0000000
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
-
-public interface Bootstrap {
-
-  void start();
-}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
deleted file mode 100644
index 8021161..0000000
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
-
-import io.grpc.BindableService;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NettyServerBuilder;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Properties;
-import javax.net.ssl.SSLException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GrpcBootstrap implements Bootstrap {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final Server server;
-
-  public GrpcBootstrap(GrpcTccServerConfig serverConfig, BindableService... services) {
-    ServerBuilder<?> serverBuilder;
-    if (serverConfig.isSslEnable()) {
-      serverBuilder = NettyServerBuilder.forAddress(
-          new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()));
-
-      try {
-        ((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
-      } catch (SSLException e) {
-        throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
-      }
-    } else {
-      serverBuilder = ServerBuilder.forPort(serverConfig.getPort());
-    }
-    Arrays.stream(services).forEach(serverBuilder::addService);
-    server = serverBuilder.build();
-  }
-
-  @Override
-  public void start() {
-    Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
-
-    try {
-      server.start();
-      server.awaitTermination();
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to start grpc server.", e);
-    } catch (InterruptedException e) {
-      LOG.error("grpc server was interrupted.", e);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  private SslContextBuilder getSslContextBuilder(GrpcTccServerConfig config) {
-
-    Properties prop = new Properties();
-    ClassLoader classLoader = getClass().getClassLoader();
-    try {
-      prop.load(classLoader.getResourceAsStream("ssl.properties"));
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to read ssl.properties.", e);
-    }
-
-    InputStream cert = getInputStream(classLoader, config.getCert(), "Server Cert");
-    InputStream key = getInputStream(classLoader, config.getKey(), "Server Key");
-
-    SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(cert, key)
-        .protocols(prop.getProperty("protocols"))
-        .ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
-    if (config.isMutualAuth()) {
-      InputStream clientCert = getInputStream(classLoader, config.getClientCert(), "Client Cert");
-      sslClientContextBuilder.trustManager(clientCert);
-      sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
-    }
-    return GrpcSslContexts.configure(sslClientContextBuilder,
-        SslProvider.OPENSSL);
-  }
-
-  private InputStream getInputStream(ClassLoader classLoader, String resource, String config) {
-    InputStream is = classLoader.getResourceAsStream(resource);
-    if (is == null) {
-      throw new IllegalStateException("Cannot load the " + config + " from " + resource);
-    }
-    return is;
-
-  }
-}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
deleted file mode 100644
index f40076c..0000000
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class GrpcTccServerConfig {
-  @Value("${alpha.server.host:0.0.0.0}")
-  private String host;
-
-  @Value("${alpha.server.port:8080}")
-  private int port;
-
-  @Value("${alpha.server.ssl.enable:false}")
-  private boolean sslEnable;
-
-  @Value("${alpha.server.ssl.cert:server.crt}")
-  private String cert;
-
-  @Value("${alpha.server.ssl.key:server.pem}")
-  private String key;
-
-  @Value("${alpha.server.ssl.mutualAuth:false}")
-  private boolean mutualAuth;
-
-  @Value("${alpha.server.ssl.clientCert:client.crt}")
-  private String clientCert;
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public boolean isSslEnable() {
-    return sslEnable;
-  }
-
-  public String getCert() {
-    return cert;
-  }
-
-  public String getKey() {
-    return key;
-  }
-
-  public boolean isMutualAuth() {
-    return mutualAuth;
-  }
-
-  public String getClientCert() {
-    return clientCert;
-  }
-}
-
-


[incubator-servicecomb-saga] 02/14: SCB-856 Add junit test framework code for alpha TCC workflow.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 46f0f3b950419439287f760a54d868660005435a
Author: cherrylzhao <zh...@126.com>
AuthorDate: Wed Aug 22 22:59:49 2018 +0800

    SCB-856 Add junit test framework code for alpha TCC workflow.
---
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  | 125 +++++++++++++++++++++
 .../tcc/server/common/AlphaTccApplication.java     |  28 +++++
 .../saga/alpha/tcc/server/common/Bootstrap.java    |  23 ++++
 .../alpha/tcc/server/common/GrpcBootstrap.java     | 109 ++++++++++++++++++
 .../tcc/server/common/GrpcTccServerConfig.java     |  75 +++++++++++++
 .../common/TccCoordinateCommandStreamObserver.java |  52 +++++++++
 6 files changed, 412 insertions(+)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
new file mode 100644
index 0000000..5e303e4
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.annotation.PostConstruct;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.AlphaTccApplication;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.Bootstrap;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcBootstrap;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcTccServerConfig;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaTccApplication.class},
+    properties = {
+        "alpha.server.host=0.0.0.0",
+        "alpha.server.port=8098",
+        "alpha.event.pollingInterval=1"
+    })
+public class AlphaTccServerTest {
+
+  @Autowired
+  private GrpcTccServerConfig grpcTccServerConfig;
+
+  private static GrpcTccServerConfig serverConfig;
+  @PostConstruct
+  public void init() {
+    serverConfig = grpcTccServerConfig;
+    server = new GrpcBootstrap(serverConfig, new GrpcTccEventService());
+    new Thread(server::start).start();
+  }
+
+  private static final int port = 8090;
+  private  static Bootstrap server;
+  protected static ManagedChannel clientChannel;
+
+  private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
+
+  private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
+
+  private static final Queue<GrpcTccCordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+
+  private final TccCoordinateCommandStreamObserver commandStreamObserver =
+      new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
+
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+  private final String compensationMethod = getClass().getCanonicalName();
+
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
+
+  private final GrpcServiceConfig serviceConfig = GrpcServiceConfig.newBuilder()
+      .setServiceName(serviceName)
+      .setInstanceId(instanceId)
+      .build();
+
+  @BeforeClass
+  public static void setupClientChannel() {
+    clientChannel = NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    clientChannel.shutdown();
+    clientChannel = null;
+  }
+
+  @Before
+  public void before() {
+    System.out.println(" globalTxId " + globalTxId);
+  }
+
+  @After
+  public void after() {
+//    blockingStub.onDisconnected(serviceConfig);
+  }
+
+  @Test
+  public void assertOnConnect() {
+//    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+  }
+
+  private GrpcAck onCompensation(GrpcTccCordinateCommand command) {
+    return GrpcAck.newBuilder().setAborted(false).build();
+  }
+
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
new file mode 100644
index 0000000..464dffc
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class AlphaTccApplication {
+  public static void main(String[] args) {
+    SpringApplication.run(AlphaTccApplication.class, args);
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
new file mode 100644
index 0000000..6382378
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+public interface Bootstrap {
+
+  void start();
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
new file mode 100644
index 0000000..8021161
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Properties;
+import javax.net.ssl.SSLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcBootstrap implements Bootstrap {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final Server server;
+
+  public GrpcBootstrap(GrpcTccServerConfig serverConfig, BindableService... services) {
+    ServerBuilder<?> serverBuilder;
+    if (serverConfig.isSslEnable()) {
+      serverBuilder = NettyServerBuilder.forAddress(
+          new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()));
+
+      try {
+        ((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
+      } catch (SSLException e) {
+        throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
+      }
+    } else {
+      serverBuilder = ServerBuilder.forPort(serverConfig.getPort());
+    }
+    Arrays.stream(services).forEach(serverBuilder::addService);
+    server = serverBuilder.build();
+  }
+
+  @Override
+  public void start() {
+    Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
+
+    try {
+      server.start();
+      server.awaitTermination();
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to start grpc server.", e);
+    } catch (InterruptedException e) {
+      LOG.error("grpc server was interrupted.", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private SslContextBuilder getSslContextBuilder(GrpcTccServerConfig config) {
+
+    Properties prop = new Properties();
+    ClassLoader classLoader = getClass().getClassLoader();
+    try {
+      prop.load(classLoader.getResourceAsStream("ssl.properties"));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to read ssl.properties.", e);
+    }
+
+    InputStream cert = getInputStream(classLoader, config.getCert(), "Server Cert");
+    InputStream key = getInputStream(classLoader, config.getKey(), "Server Key");
+
+    SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(cert, key)
+        .protocols(prop.getProperty("protocols"))
+        .ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
+    if (config.isMutualAuth()) {
+      InputStream clientCert = getInputStream(classLoader, config.getClientCert(), "Client Cert");
+      sslClientContextBuilder.trustManager(clientCert);
+      sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
+    }
+    return GrpcSslContexts.configure(sslClientContextBuilder,
+        SslProvider.OPENSSL);
+  }
+
+  private InputStream getInputStream(ClassLoader classLoader, String resource, String config) {
+    InputStream is = classLoader.getResourceAsStream(resource);
+    if (is == null) {
+      throw new IllegalStateException("Cannot load the " + config + " from " + resource);
+    }
+    return is;
+
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
new file mode 100644
index 0000000..f40076c
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class GrpcTccServerConfig {
+  @Value("${alpha.server.host:0.0.0.0}")
+  private String host;
+
+  @Value("${alpha.server.port:8080}")
+  private int port;
+
+  @Value("${alpha.server.ssl.enable:false}")
+  private boolean sslEnable;
+
+  @Value("${alpha.server.ssl.cert:server.crt}")
+  private String cert;
+
+  @Value("${alpha.server.ssl.key:server.pem}")
+  private String key;
+
+  @Value("${alpha.server.ssl.mutualAuth:false}")
+  private boolean mutualAuth;
+
+  @Value("${alpha.server.ssl.clientCert:client.crt}")
+  private String clientCert;
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public boolean isSslEnable() {
+    return sslEnable;
+  }
+
+  public String getCert() {
+    return cert;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public boolean isMutualAuth() {
+    return mutualAuth;
+  }
+
+  public String getClientCert() {
+    return clientCert;
+  }
+}
+
+
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
new file mode 100644
index 0000000..7f1295b
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCordinateCommand> {
+
+  private static  Queue<GrpcTccCordinateCommand> receivedCommands;
+  private  Consumer<GrpcTccCordinateCommand> consumer;
+  private boolean completed = false;
+
+  public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCordinateCommand> consumer,
+      Queue<GrpcTccCordinateCommand> receivedCommands) {
+    this.consumer = consumer;
+    TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
+  }
+
+  @Override
+  public void onNext(GrpcTccCordinateCommand value) {
+    consumer.accept(value);
+    receivedCommands.add(value);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+
+  }
+
+  @Override
+  public void onCompleted() {
+    completed = true;
+  }
+}


[incubator-servicecomb-saga] 01/14: SCB-856 Implement alpha TCC workflow based on local memory.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 819f19a8adbffde7e3eb025ea8b8a359416077eb
Author: cherrylzhao <zh...@126.com>
AuthorDate: Wed Aug 22 21:52:13 2018 +0800

    SCB-856 Implement alpha TCC workflow based on local memory.
---
 .../alpha/tcc/server/GrpcOmegaTccCallback.java     |  48 +++++++++
 .../saga/alpha/tcc/server/GrpcTccEventService.java |  72 +++++++++++++
 .../saga/alpha/tcc/server/OmegaCallback.java       |  29 ++++++
 .../alpha/tcc/server/OmegaCallbacksRegistry.java   |  71 +++++++++++++
 .../alpha/tcc/server/TransactionEventRegistry.java |  55 ++++++++++
 .../alpha/tcc/server/event/ParticipateEvent.java   | 113 +++++++++++++++++++++
 .../tcc/server/event/ParticipateEventFactory.java  |  42 ++++++++
 7 files changed, 430 insertions(+)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
new file mode 100644
index 0000000..e05b024
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Grpc omega callback for TCC workflow.
+ *
+ * @author zhaojun
+ */
+public final class GrpcOmegaTccCallback implements OmegaCallback {
+
+  private StreamObserver<GrpcTccCordinateCommand> responseObserver;
+
+  public GrpcOmegaTccCallback(StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void compensate(ParticipateEvent event, TransactionStatus status) {
+    GrpcTccCordinateCommand command = GrpcTccCordinateCommand.newBuilder()
+        .setGlobalTxId(event.getGlobalTxId())
+        .setLocalTxId(event.getLocalTxId())
+        .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
+        .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+        .build();
+    responseObserver.onNext(command);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
new file mode 100644
index 0000000..a76530b
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+
+/**
+ * Grpc TCC event service implement.
+ *
+ * @author zhaojun
+ */
+public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase {
+
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
+
+  @Override
+  public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+    OmegaCallbacksRegistry.register(request, responseObserver);
+  }
+
+  @Override
+  public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+  }
+
+  @Override
+  public void participate(GrpcTccParticipateEvent request, StreamObserver<GrpcAck> responseObserver) {
+    TransactionEventRegistry.register(ParticipateEventFactory.create(request));
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    for (ParticipateEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+      OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
+    }
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
new file mode 100644
index 0000000..14f8842
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public interface OmegaCallback {
+
+  void compensate(ParticipateEvent event, TransactionStatus status);
+
+  default void disconnect() {
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
new file mode 100644
index 0000000..c505df1
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static java.util.Collections.emptyMap;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Manage Omega callbacks.
+ *
+ * @author zhaojun
+ */
+public final class OmegaCallbacksRegistry {
+
+  private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
+
+  /**
+   * Register omega TCC callback.
+   *
+   * @param request Grpc service config
+   * @param responseObserver stream observer
+   */
+  public static void register(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+    REGISTRY
+        .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
+        .put(request.getInstanceId(), new GrpcOmegaTccCallback(responseObserver));
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieve(String serviceName, String instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id, then remove it from registry.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieveThenRemove(String serviceName, String instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId);
+  }
+
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
new file mode 100644
index 0000000..b89967a
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+
+/**
+ * Manage TCC transaction event.
+ *
+ * @author zhaojun
+ */
+public final class TransactionEventRegistry {
+
+  private final static Map<String, List<ParticipateEvent>> REGISTRY = new ConcurrentHashMap<>();
+
+  /**
+   * Register participate event.
+   *
+   * @param participateEvent participate event
+   */
+  public static void register(ParticipateEvent participateEvent) {
+    REGISTRY
+        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
+        .add(participateEvent);
+  }
+
+  /**
+   * Retrieve participate event from registry.
+   *
+   * @param globalTxId global transaction id
+   * @return participate events
+   */
+  public static List<ParticipateEvent> retrieve(String globalTxId) {
+    return REGISTRY.get(globalTxId);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
new file mode 100644
index 0000000..66182c6
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+/**
+ * Participate event.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEvent {
+
+  private String globalTxId;
+  private String localTxId;
+  private String parentTxId;
+  private String serviceName;
+  private String instanceId;
+  private String confirmMethod;
+  private String cancelMethod;
+  private TransactionStatus status;
+
+  public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String serviceName,
+      String instanceId, String confirmMethod, String cancelMethod, TransactionStatus status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.confirmMethod = confirmMethod;
+    this.cancelMethod = cancelMethod;
+    this.status = status;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
+
+  public String getConfirmMethod() {
+    return confirmMethod;
+  }
+
+  public void setConfirmMethod(String confirmMethod) {
+    this.confirmMethod = confirmMethod;
+  }
+
+  public String getCancelMethod() {
+    return cancelMethod;
+  }
+
+  public void setCancelMethod(String cancelMethod) {
+    this.cancelMethod = cancelMethod;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public TransactionStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TransactionStatus status) {
+    this.status = status;
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
new file mode 100644
index 0000000..d876acf
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+
+/**
+ * Participate event factory.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEventFactory {
+
+  public static ParticipateEvent create(GrpcTccParticipateEvent request) {
+    return new ParticipateEvent(
+        request.getGlobalTxId(),
+        request.getLocalTxId(),
+        request.getParentTxId(),
+        request.getConfirmMethod(),
+        request.getCancelMethod(),
+        request.getServiceName(),
+        request.getInstanceId(),
+        TransactionStatus.valueOf(request.getStatus())
+    );
+  }
+}


[incubator-servicecomb-saga] 03/14: SCB-856 Revise GrpcTccCordinateCommand => GrpcTccCoordinateCommand.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 2064d86e58c3971f38c4b352f5486a93a851ef6e
Author: cherrylzhao <zh...@126.com>
AuthorDate: Wed Aug 22 23:09:05 2018 +0800

    SCB-856 Revise GrpcTccCordinateCommand => GrpcTccCoordinateCommand.
---
 .../saga/alpha/tcc/server/GrpcOmegaTccCallback.java        |  8 ++++----
 .../saga/alpha/tcc/server/GrpcTccEventService.java         |  6 +++---
 .../saga/alpha/tcc/server/OmegaCallbacksRegistry.java      |  4 ++--
 .../saga/alpha/tcc/server/AlphaTccServerTest.java          |  6 +++---
 .../server/common/TccCoordinateCommandStreamObserver.java  | 14 +++++++-------
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
index e05b024..8e92a3f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -20,7 +20,7 @@ package org.apache.servicecomb.saga.alpha.tcc.server;
 import io.grpc.stub.StreamObserver;
 import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 /**
  * Grpc omega callback for TCC workflow.
@@ -29,15 +29,15 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
  */
 public final class GrpcOmegaTccCallback implements OmegaCallback {
 
-  private StreamObserver<GrpcTccCordinateCommand> responseObserver;
+  private StreamObserver<GrpcTccCoordinateCommand> responseObserver;
 
-  public GrpcOmegaTccCallback(StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+  public GrpcOmegaTccCallback(StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     this.responseObserver = responseObserver;
   }
 
   @Override
   public void compensate(ParticipateEvent event, TransactionStatus status) {
-    GrpcTccCordinateCommand command = GrpcTccCordinateCommand.newBuilder()
+    GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
         .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
index a76530b..f24fefd 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -22,7 +22,7 @@ import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
 import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
@@ -39,7 +39,7 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   @Override
-  public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+  public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     OmegaCallbacksRegistry.register(request, responseObserver);
   }
 
@@ -48,7 +48,7 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
   }
 
   @Override
-  public void participate(GrpcTccParticipateEvent request, StreamObserver<GrpcAck> responseObserver) {
+  public void participate(GrpcTccParticipatedEvent request, StreamObserver<GrpcAck> responseObserver) {
     TransactionEventRegistry.register(ParticipateEventFactory.create(request));
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
index c505df1..92733e8 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
@@ -23,7 +23,7 @@ import io.grpc.stub.StreamObserver;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 /**
  * Manage Omega callbacks.
@@ -40,7 +40,7 @@ public final class OmegaCallbacksRegistry {
    * @param request Grpc service config
    * @param responseObserver stream observer
    */
-  public static void register(GrpcServiceConfig request, StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+  public static void register(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     REGISTRY
         .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
         .put(request.getInstanceId(), new GrpcOmegaTccCallback(responseObserver));
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 5e303e4..bfef293 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -32,7 +32,7 @@ import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcTccServerConfig;
 import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
@@ -74,7 +74,7 @@ public class AlphaTccServerTest {
 
   private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
 
-  private static final Queue<GrpcTccCordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+  private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
   private final TccCoordinateCommandStreamObserver commandStreamObserver =
       new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
@@ -118,7 +118,7 @@ public class AlphaTccServerTest {
 //    asyncStub.onConnected(serviceConfig, commandStreamObserver);
   }
 
-  private GrpcAck onCompensation(GrpcTccCordinateCommand command) {
+  private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
     return GrpcAck.newBuilder().setAborted(false).build();
   }
 
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
index 7f1295b..cc39a8c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
@@ -20,22 +20,22 @@ package org.apache.servicecomb.saga.alpha.tcc.server.common;
 import io.grpc.stub.StreamObserver;
 import java.util.Queue;
 import java.util.function.Consumer;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
-public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCordinateCommand> {
+public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
 
-  private static  Queue<GrpcTccCordinateCommand> receivedCommands;
-  private  Consumer<GrpcTccCordinateCommand> consumer;
+  private static  Queue<GrpcTccCoordinateCommand> receivedCommands;
+  private  Consumer<GrpcTccCoordinateCommand> consumer;
   private boolean completed = false;
 
-  public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCordinateCommand> consumer,
-      Queue<GrpcTccCordinateCommand> receivedCommands) {
+  public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
+      Queue<GrpcTccCoordinateCommand> receivedCommands) {
     this.consumer = consumer;
     TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
   }
 
   @Override
-  public void onNext(GrpcTccCordinateCommand value) {
+  public void onNext(GrpcTccCoordinateCommand value) {
     consumer.accept(value);
     receivedCommands.add(value);
   }


[incubator-servicecomb-saga] 06/14: SCB-856 Integrate alpha saga and tcc within same boot which are different with port.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e9455e009bf49c8dac389656b3d5ee69f6a39659
Author: cherrylzhao <zh...@126.com>
AuthorDate: Thu Aug 23 17:32:55 2018 +0800

    SCB-856 Integrate alpha saga and tcc within same boot which are different with port.
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 18 +++++-----
 .../saga/alpha/server/GrpcServerConfig.java        |  2 --
 .../saga/alpha/server/GrpcStartable.java           |  4 +--
 .../saga/alpha/server/SagaGrpcServerConfig.java    | 28 +++++++++++++++
 .../saga/alpha/server/ServerStartable.java         |  2 +-
 .../saga/alpha/server/tcc/AlphaTccConfig.java      | 40 ++++++++++++++++++++++
 .../tcc}/GrpcOmegaTccCallback.java                 | 16 ++++-----
 .../server => server/tcc}/GrpcTccEventService.java | 18 +++++-----
 .../{tcc/server => server/tcc}/OmegaCallback.java  | 16 ++++-----
 .../tcc}/OmegaCallbacksRegistry.java               | 14 ++++----
 .../saga/alpha/server/tcc/TccGrpcServerConfig.java | 37 ++++++++++++++++++++
 .../tcc}/TransactionEventRegistry.java             | 16 ++++-----
 .../tcc}/event/ParticipateEventFactory.java        |  2 +-
 .../tcc}/event/ParticipatedEvent.java              |  2 +-
 .../src/main/resources/application.yaml            |  2 ++
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  |  1 +
 16 files changed, 161 insertions(+), 57 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e45acdd..6e31628 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,10 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.EventScanner;
@@ -37,12 +35,14 @@ import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
 @Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
   private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@@ -86,13 +86,11 @@ class AlphaConfig {
   @Bean
   TxConsistentService txConsistentService(
       @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
-      GrpcServerConfig serverConfig,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       TxTimeoutRepository timeoutRepository,
-      OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      OmegaCallback omegaCallback) {
 
     new EventScanner(scheduler,
         eventRepository, commandRepository, timeoutRepository,
@@ -100,16 +98,16 @@ class AlphaConfig {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository);
 
-    ServerStartable startable = buildGrpc(serverConfig, consistentService, omegaCallbacks);
-    new Thread(startable::start).start();
-
     return consistentService;
   }
 
-  private ServerStartable buildGrpc(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
+  @Bean
+  ServerStartable sagaServerBootstrap(SagaGrpcServerConfig serverConfig, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    return new GrpcStartable(serverConfig,
+    ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+    new Thread(bootstrap::start).start();
+    return bootstrap;
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
index 66dd992..bb4c880 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
@@ -17,10 +17,8 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import org.springframework.context.annotation.Configuration;
 import org.springframework.beans.factory.annotation.Value;
 
-@Configuration
 public class GrpcServerConfig {
   @Value("${alpha.server.host:0.0.0.0}")
   private String host;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 4d99374..a599967 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -41,12 +41,12 @@ import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 
-class GrpcStartable implements ServerStartable {
+public class GrpcStartable implements ServerStartable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Server server;
 
-  GrpcStartable(GrpcServerConfig serverConfig, BindableService... services) {
+  public GrpcStartable(GrpcServerConfig serverConfig, BindableService... services) {
     ServerBuilder<?> serverBuilder;
     if (serverConfig.isSslEnable()){
       serverBuilder = NettyServerBuilder.forAddress(
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
new file mode 100644
index 0000000..fd86a91
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
+public class SagaGrpcServerConfig extends GrpcServerConfig {
+}
+
+
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
index 33d39df..41dfdbd 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
@@ -20,6 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-interface ServerStartable {
+public interface ServerStartable {
   void start();
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
new file mode 100644
index 0000000..8dabf79
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
+import org.apache.servicecomb.saga.alpha.server.ServerStartable;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.boot.autoconfigure.domain.EntityScan;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
+//@ConditionalOnProperty(value ="alpha.mode.TCC")
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
+public class AlphaTccConfig {
+
+  @Bean
+  ServerStartable tccServerBootstrap(TccGrpcServerConfig serverConfig) {
+    ServerStartable bootstrap = new GrpcStartable(serverConfig, new GrpcTccEventService());
+    new Thread(bootstrap::start).start();
+    return bootstrap;
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
similarity index 74%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index 205c6db..e5364b0 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -6,19 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server;
+package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
similarity index 82%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index aa71f79..148a0e9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -6,20 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server;
+package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
similarity index 56%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 31e38ed..3c19cbb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -6,18 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server;
+package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
similarity index 83%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index fbac514..ef075a5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -6,16 +6,16 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server;
+package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import static java.util.Collections.emptyMap;
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
new file mode 100644
index 0000000..20ab063
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
+public class TccGrpcServerConfig extends GrpcServerConfig {
+
+  @Value("${alpha.server.tcc-port:8080}")
+  private int port;
+
+  public int getPort() {
+    return port;
+  }
+}
+
+
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
similarity index 72%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
index 3c74b10..a2e3ddc 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
@@ -6,22 +6,22 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server;
+package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 
 /**
  * Manage TCC transaction event.
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
similarity index 95%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
index 4403e5b..3c7523f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server.event;
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
similarity index 97%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
index 70038c6..67c84ac 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server.event;
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index b8cd118..92009ac 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,8 @@ alpha:
   server:
     host: 0.0.0.0
     port: 8080
+    tcc-port: 8180
+  mode: SAGA,TCC
 
 ---
 spring:
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index bfef293..517729a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -25,6 +25,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.annotation.PostConstruct;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.saga.alpha.tcc.server.common.AlphaTccApplication;
 import org.apache.servicecomb.saga.alpha.tcc.server.common.Bootstrap;
 import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcBootstrap;


[incubator-servicecomb-saga] 13/14: SCB-856 Add info log for alpha GrpcTccEventService.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 38751840e52abd9fd39a7821cf4bd07f9096de25
Author: cherrylzhao <zh...@126.com>
AuthorDate: Tue Aug 28 11:42:45 2018 +0800

    SCB-856 Add info log for alpha GrpcTccEventService.
---
 .../saga/alpha/server/tcc/GrpcTccEventService.java            | 11 ++++++++++-
 .../alpha/server/tcc/registry/TransactionEventRegistry.java   | 11 +++++++++++
 .../servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java |  9 +--------
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index 27410e2..e3b89b9 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,10 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
-import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -30,12 +31,16 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Grpc TCC event service implement.
  */
 public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
   
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
@@ -50,10 +55,12 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
   @Override
   public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
     OmegaCallbacksRegistry.register(request, responseObserver);
+    LOG.info("Established connection service [{}] instanceId [{}].", request.getServiceName(), request.getInstanceId());
   }
 
   @Override
   public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received transaction start event, global tx id: {}", request.getGlobalTxId());
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
   }
@@ -67,6 +74,7 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    LOG.info("Received transaction end event, global tx id: {}", request.getGlobalTxId());
     responseObserver.onNext(tccCallbackEngine.execute(request) ? ALLOW : REJECT);
     responseObserver.onCompleted();
   }
@@ -75,6 +83,7 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
     OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId());
     if (null != omegaCallback) {
+      LOG.info("Disconnect from alpha, service [{}] instanceId [{}].", request.getServiceName(), request.getInstanceId());
       omegaCallback.disconnect();
     }
     responseObserver.onNext(ALLOW);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
index 2304f52..d135a8b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
@@ -17,17 +17,22 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc.registry;
 
+import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manage TCC transaction event.
  */
 public final class TransactionEventRegistry {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
 
   /**
@@ -39,6 +44,12 @@ public final class TransactionEventRegistry {
     REGISTRY
         .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedHashSet<>())
         .add(participateEvent);
+
+    LOG.info("Registered participated event, global tx: {}, local tx: {}, parent id: {}, "
+            + "confirm: {}, cancel: {}, status: {}, service [{}] instanceId [{}]",
+        participateEvent.getGlobalTxId(), participateEvent.getLocalTxId(), participateEvent.getParentTxId(),
+        participateEvent.getConfirmMethod(), participateEvent.getCancelMethod(), participateEvent.getStatus(),
+        participateEvent.getServiceName(), participateEvent.getInstanceId());
   }
 
   /**
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 60b6c57..4bbfa45 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -31,9 +31,9 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
 import org.apache.servicecomb.saga.alpha.server.tcc.callback.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
@@ -46,7 +46,6 @@ import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEve
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,11 +98,6 @@ public class AlphaTccServerTest {
     clientChannel = null;
   }
 
-  @Before
-  public void before() {
-    System.out.println(" globalTxId " + globalTxId);
-  }
-
   @After
   public void after() {
     blockingStub.onDisconnected(serviceConfig);
@@ -233,7 +227,6 @@ public class AlphaTccServerTest {
     assertThat(command.getServiceName(), is(serviceName));
 
     assertThat(result.getAborted(), is(false));
-
   }
 
   private GrpcTccParticipatedEvent newParticipatedEvent(String status) {


[incubator-servicecomb-saga] 09/14: SCB-856 Refactor TCC alpha server using the same grpc port of saga.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 428f87794595a64c270d9d75796db9a1254ee241
Author: cherrylzhao <zh...@126.com>
AuthorDate: Mon Aug 27 10:55:33 2018 +0800

    SCB-856 Refactor TCC alpha server using the same grpc port of saga.
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  8 ++---
 .../saga/alpha/server/GrpcServerConfig.java        |  2 ++
 .../saga/alpha/server/SagaGrpcServerConfig.java    | 28 ----------------
 .../saga/alpha/server/tcc/AlphaTccConfig.java      | 38 ----------------------
 .../saga/alpha/server/tcc/GrpcTccEventService.java |  4 +--
 .../saga/alpha/server/tcc/TccGrpcServerConfig.java | 37 ---------------------
 .../src/main/resources/application.yaml            |  2 --
 .../saga/alpha/server/AlphaIntegrationTest.java    | 19 +++--------
 .../alpha/server/AlphaIntegrationWithSSLTest.java  |  3 +-
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  |  5 ++-
 10 files changed, 16 insertions(+), 130 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 6e31628..8d9af1e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -34,15 +34,14 @@ import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
 @Configuration
-@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
   private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@@ -102,10 +101,11 @@ class AlphaConfig {
   }
 
   @Bean
-  ServerStartable sagaServerBootstrap(SagaGrpcServerConfig serverConfig, TxConsistentService txConsistentService,
+  ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     ServerStartable bootstrap = new GrpcStartable(serverConfig,
-        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks),
+        new GrpcTccEventService());
     new Thread(bootstrap::start).start();
     return bootstrap;
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
index bb4c880..e1368c4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
@@ -18,7 +18,9 @@
 package org.apache.servicecomb.saga.alpha.server;
 
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
 
+@Configuration
 public class GrpcServerConfig {
   @Value("${alpha.server.host:0.0.0.0}")
   private String host;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
deleted file mode 100644
index fd86a91..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server;
-
-import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
-public class SagaGrpcServerConfig extends GrpcServerConfig {
-}
-
-
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
deleted file mode 100644
index 97b35ee..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server.tcc;
-
-import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
-import org.apache.servicecomb.saga.alpha.server.ServerStartable;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
-import org.springframework.boot.autoconfigure.domain.EntityScan;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-@EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
-@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
-public class AlphaTccConfig {
-
-  @Bean
-  ServerStartable tccServerBootstrap(TccGrpcServerConfig serverConfig) {
-    ServerStartable bootstrap = new GrpcStartable(serverConfig, new GrpcTccEventService());
-    new Thread(bootstrap::start).start();
-    return bootstrap;
-  }
-}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index cd61162..ef9adbe 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -59,8 +59,8 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
     try {
       for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
-        OmegaCallbacksRegistry.retrieve(event.getServiceName(),
-            event.getInstanceId()).invoke(event, request.getStatus());
+        OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId())
+            .invoke(event, request.getStatus());
       }
     } catch (AlphaException ex) {
       responseObserver.onNext(REJECT);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
deleted file mode 100644
index 20ab063..0000000
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.servicecomb.saga.alpha.server.tcc;
-
-import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
-public class TccGrpcServerConfig extends GrpcServerConfig {
-
-  @Value("${alpha.server.tcc-port:8080}")
-  private int port;
-
-  public int getPort() {
-    return port;
-  }
-}
-
-
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 92009ac..b8cd118 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,8 +21,6 @@ alpha:
   server:
     host: 0.0.0.0
     port: 8080
-    tcc-port: 8180
-  mode: SAGA,TCC
 
 ---
 spring:
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 1af6b05..8f5122b 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -32,8 +32,10 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.io.File;
-import java.util.Arrays;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -41,10 +43,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
-
 import javax.annotation.PostConstruct;
-import javax.net.ssl.SSLException;
-
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.EventScanner;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -70,22 +69,14 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
-import io.grpc.netty.NettyChannelBuilder;
-import io.grpc.stub.StreamObserver;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8090",
-        "alpha.event.pollingInterval=1",
-        "alpha.mode=SAGA"
+        "alpha.event.pollingInterval=1"
        })
 public class AlphaIntegrationTest {
   private static final int port = 8090;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
index e14775c..8a2df82 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
@@ -39,8 +39,7 @@ import io.netty.handler.ssl.SslProvider;
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8092",
-        "alpha.event.pollingInterval=1",
-        "alpha.mode=SAGA"
+        "alpha.event.pollingInterval=1"
     })
 public class AlphaIntegrationWithSSLTest extends AlphaIntegrationTest {
   private static final int port = 8092;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 102141e..50b992f 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -57,12 +57,11 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {AlphaApplication.class},
     properties = {
         "alpha.server.host=0.0.0.0",
-        "alpha.server.tcc-port=8190",
-        "alpha.mode=TCC"
+        "alpha.server.port=8090"
     })
 public class AlphaTccServerTest {
 
-  private static final int port = 8190;
+  private static final int port = 8090;
   protected static ManagedChannel clientChannel;
 
   private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);


[incubator-servicecomb-saga] 11/14: SCB-856 Refactor TCC alpha server package.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 96d09dcec42cbde956809bab73e6ca6efff1dc40
Author: cherrylzhao <zh...@126.com>
AuthorDate: Mon Aug 27 14:39:40 2018 +0800

    SCB-856 Refactor TCC alpha server package.
---
 .../java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java  | 4 ++--
 .../servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java      | 4 ++++
 .../saga/alpha/server/tcc/{ => callback}/CallbackEngine.java        | 2 +-
 .../saga/alpha/server/tcc/{ => callback}/GrpcOmegaTccCallback.java  | 3 ++-
 .../saga/alpha/server/tcc/{ => callback}/OmegaCallback.java         | 2 +-
 .../saga/alpha/server/tcc/{ => callback}/OmegaCallbackWrapper.java  | 3 ++-
 .../saga/alpha/server/tcc/{ => callback}/TccCallbackEngine.java     | 3 ++-
 .../alpha/server/tcc/{ => registry}/OmegaCallbacksRegistry.java     | 4 +++-
 .../alpha/server/tcc/{ => registry}/TransactionEventRegistry.java   | 2 +-
 .../servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java       | 6 +++---
 10 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 91143f4..d332be2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -35,8 +35,8 @@ import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
-import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbackWrapper;
-import org.apache.servicecomb.saga.alpha.server.tcc.TccCallbackEngine;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbackWrapper;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index aa20f77..27410e2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,7 +18,11 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
similarity index 94%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
index e69eb6a..a8521e6 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/CallbackEngine.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
similarity index 93%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
index e1fb68a..cf8eb8d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
@@ -15,9 +15,10 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
similarity index 94%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
index 41a6c06..02ae7a6 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
similarity index 90%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
index 3b55ffa..62c183a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbackWrapper.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
@@ -15,9 +15,10 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public class OmegaCallbackWrapper implements OmegaCallback {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
similarity index 94%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
index f9aae88..bcdca6d 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccCallbackEngine.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -15,9 +15,10 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
 
 import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
similarity index 94%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
index 9e228da..02340b2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.registry;
 
 import static java.util.Collections.emptyMap;
 
@@ -24,6 +24,8 @@ import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.core.AlphaException;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 import org.slf4j.Logger;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
similarity index 96%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
index 6218304..2304f52 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.server.tcc;
+package org.apache.servicecomb.saga.alpha.server.tcc.registry;
 
 import java.util.LinkedHashSet;
 import java.util.Map;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 50b992f..503d80a 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -30,9 +30,9 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
-import org.apache.servicecomb.saga.alpha.server.tcc.GrpcOmegaTccCallback;
-import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbacksRegistry;
-import org.apache.servicecomb.saga.alpha.server.tcc.TransactionEventRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;


[incubator-servicecomb-saga] 12/14: SCB-856 Add exception junit test case for TCC alpha server.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 31265123edcaada8304bd7e662b9b118aad1bb3a
Author: cherrylzhao <zh...@126.com>
AuthorDate: Mon Aug 27 15:51:57 2018 +0800

    SCB-856 Add exception junit test case for TCC alpha server.
---
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  | 66 ++++++++++++++++++++++
 .../server/TccCoordinateCommandStreamObserver.java |  5 ++
 2 files changed, 71 insertions(+)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 503d80a..60b6c57 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -118,6 +118,17 @@ public class AlphaTccServerTest {
     );
   }
 
+  @Test
+  public void assertOnDisConnect() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    assertThat(
+        OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class))
+    );
+    blockingStub.onDisconnected(serviceConfig);
+    assertThat(commandStreamObserver.isCompleted(), is(true));
+  }
+
   private void awaitUntilConnected() {
     await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName)));
   }
@@ -168,6 +179,61 @@ public class AlphaTccServerTest {
     assertThat(command.getMethod(), is("cancel"));
     assertThat(command.getGlobalTxId(), is(globalTxId));
     assertThat(command.getServiceName(), is(serviceName));
+    assertThat(commandStreamObserver.isCompleted(), is(false));
+  }
+
+  @Test
+  public void assertOnCallbackNotExist() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+
+    OmegaCallbacksRegistry.getRegistry().remove(serviceName);
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+    assertThat(result.getAborted(), is(true));
+  }
+
+  @Test
+  public void assertOnCallbacksExecuteError() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+
+    OmegaCallbacksRegistry.getRegistry().get(serviceName).put(instanceId, new GrpcOmegaTccCallback(null));
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+    assertThat(result.getAborted(), is(true));
+    assertThat(OmegaCallbacksRegistry.getRegistry().get(serviceName).size(), is(0));
+  }
+
+  @Test
+  public void assertOnSwitchOtherCallbackInstance() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    GrpcServiceConfig config = GrpcServiceConfig.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(uniquify("instanceId"))
+        .build();
+    asyncStub.onConnected(config, commandStreamObserver);
+
+    await().atMost(1, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName) != null));
+    await().atMost(1, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName).size() == 2));
+
+    OmegaCallbacksRegistry.getRegistry().get(serviceName).remove(instanceId);
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.size(), is(1));
+    GrpcTccCoordinateCommand command = receivedCommands.poll();
+    assertThat(command.getMethod(), is("confirm"));
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getServiceName(), is(serviceName));
+
+    assertThat(result.getAborted(), is(false));
+
   }
 
   private GrpcTccParticipatedEvent newParticipatedEvent(String status) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
index f97c5d8..a520d89 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
@@ -26,6 +26,11 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc
 
   private Queue<GrpcTccCoordinateCommand> receivedCommands;
   private Consumer<GrpcTccCoordinateCommand> consumer;
+
+  public boolean isCompleted() {
+    return completed;
+  }
+
   private boolean completed = false;
 
   public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,


[incubator-servicecomb-saga] 14/14: SCB-856 Revise alpha TCC server test case.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 697dcc30906f675d1b669e1b2af07cb6dedc8d74
Author: cherrylzhao <zh...@126.com>
AuthorDate: Tue Aug 28 11:54:14 2018 +0800

    SCB-856 Revise alpha TCC server test case.
---
 .../apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index 4bbfa45..a5030b1 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -70,7 +70,7 @@ public class AlphaTccServerTest {
   private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
   private final TccCoordinateCommandStreamObserver commandStreamObserver =
-      new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
+      new TccCoordinateCommandStreamObserver(this::onReceivedCoordinateCommand, receivedCommands);
 
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
@@ -256,7 +256,7 @@ public class AlphaTccServerTest {
         .build();
   }
 
-  private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
+  private GrpcAck onReceivedCoordinateCommand(GrpcTccCoordinateCommand command) {
     return GrpcAck.newBuilder().setAborted(false).build();
   }
 


[incubator-servicecomb-saga] 04/14: SCB-856 Revise ParticipateEvent => ParticipatedEvent.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 4781bc1d2600defb0e31b9573002dd7cf3f9eafe
Author: cherrylzhao <zh...@126.com>
AuthorDate: Wed Aug 22 23:11:59 2018 +0800

    SCB-856 Revise ParticipateEvent => ParticipatedEvent.
---
 .../servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java   | 4 ++--
 .../servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java    | 6 +++---
 .../apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java   | 4 ++--
 .../saga/alpha/tcc/server/TransactionEventRegistry.java           | 8 ++++----
 .../saga/alpha/tcc/server/event/ParticipateEventFactory.java      | 6 +++---
 .../event/{ParticipateEvent.java => ParticipatedEvent.java}       | 4 ++--
 6 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
index 8e92a3f..e59aaa3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -18,7 +18,7 @@
 package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
@@ -36,7 +36,7 @@ public final class GrpcOmegaTccCallback implements OmegaCallback {
   }
 
   @Override
-  public void compensate(ParticipateEvent event, TransactionStatus status) {
+  public void compensate(ParticipatedEvent event, TransactionStatus status) {
     GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
index f24fefd..b82d002 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -18,12 +18,12 @@
 package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
@@ -56,7 +56,7 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
-    for (ParticipateEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+    for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
       OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
     }
     responseObserver.onNext(ALLOW);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
index 14f8842..31e38ed 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
@@ -17,12 +17,12 @@
 
 package org.apache.servicecomb.saga.alpha.tcc.server;
 
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
 
-  void compensate(ParticipateEvent event, TransactionStatus status);
+  void compensate(ParticipatedEvent event, TransactionStatus status);
 
   default void disconnect() {
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
index b89967a..511db74 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
@@ -21,7 +21,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipatedEvent;
 
 /**
  * Manage TCC transaction event.
@@ -30,14 +30,14 @@ import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
  */
 public final class TransactionEventRegistry {
 
-  private final static Map<String, List<ParticipateEvent>> REGISTRY = new ConcurrentHashMap<>();
+  private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
 
   /**
    * Register participate event.
    *
    * @param participateEvent participate event
    */
-  public static void register(ParticipateEvent participateEvent) {
+  public static void register(ParticipatedEvent participateEvent) {
     REGISTRY
         .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
         .add(participateEvent);
@@ -49,7 +49,7 @@ public final class TransactionEventRegistry {
    * @param globalTxId global transaction id
    * @return participate events
    */
-  public static List<ParticipateEvent> retrieve(String globalTxId) {
+  public static List<ParticipatedEvent> retrieve(String globalTxId) {
     return REGISTRY.get(globalTxId);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
index d876acf..5a4f325 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
@@ -18,7 +18,7 @@
 package org.apache.servicecomb.saga.alpha.tcc.server.event;
 
 import org.apache.servicecomb.saga.common.TransactionStatus;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
 
 /**
  * Participate event factory.
@@ -27,8 +27,8 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
  */
 public class ParticipateEventFactory {
 
-  public static ParticipateEvent create(GrpcTccParticipateEvent request) {
-    return new ParticipateEvent(
+  public static ParticipatedEvent create(GrpcTccParticipatedEvent request) {
+    return new ParticipatedEvent(
         request.getGlobalTxId(),
         request.getLocalTxId(),
         request.getParentTxId(),
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
similarity index 95%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
index 66182c6..2c5740b 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipatedEvent.java
@@ -24,7 +24,7 @@ import org.apache.servicecomb.saga.common.TransactionStatus;
  *
  * @author zhaojun
  */
-public class ParticipateEvent {
+public class ParticipatedEvent {
 
   private String globalTxId;
   private String localTxId;
@@ -35,7 +35,7 @@ public class ParticipateEvent {
   private String cancelMethod;
   private TransactionStatus status;
 
-  public ParticipateEvent(String globalTxId, String localTxId, String parentTxId, String serviceName,
+  public ParticipatedEvent(String globalTxId, String localTxId, String parentTxId, String serviceName,
       String instanceId, String confirmMethod, String cancelMethod, TransactionStatus status) {
     this.globalTxId = globalTxId;
     this.localTxId = localTxId;


[incubator-servicecomb-saga] 08/14: SCB-856 Add junit test case & resolve related bugs.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 3f7377693c768f563cac5f1ff8211f5bc05057d3
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Aug 24 17:53:26 2018 +0800

    SCB-856 Add junit test case & resolve related bugs.
---
 .../alpha/server/tcc/GrpcOmegaTccCallback.java     |  10 +-
 .../saga/alpha/server/tcc/GrpcTccEventService.java |  19 +++-
 .../saga/alpha/server/tcc/OmegaCallback.java       |   3 +-
 .../alpha/server/tcc/OmegaCallbacksRegistry.java   |  18 +++-
 .../alpha/server/tcc/TransactionEventRegistry.java |  10 +-
 .../server/tcc/event/ParticipateEventFactory.java  |   4 +-
 .../alpha/server/tcc/event/ParticipatedEvent.java  |  44 ++-------
 .../saga/alpha/server/AlphaIntegrationTest.java    |   3 +-
 .../alpha/server/AlphaIntegrationWithSSLTest.java  |   3 +-
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  | 110 +++++++++++++++++++--
 .../TccCoordinateCommandStreamObserver.java        |   9 +-
 11 files changed, 164 insertions(+), 69 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index e5364b0..8ea7cfb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -34,13 +34,19 @@ public final class GrpcOmegaTccCallback implements OmegaCallback {
   }
 
   @Override
-  public void compensate(ParticipatedEvent event, TransactionStatus status) {
+  public void invoke(ParticipatedEvent event, String status) {
     GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
         .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
-        .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+        .setServiceName(event.getServiceName())
+        .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
         .build();
     responseObserver.onNext(command);
   }
+
+  @Override
+  public void disconnect() {
+    responseObserver.onCompleted();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index 148a0e9..cd61162 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,8 +18,9 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -43,6 +44,8 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
   }
 
   @Override
@@ -54,8 +57,13 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
-    for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
-      OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
+    try {
+      for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+        OmegaCallbacksRegistry.retrieve(event.getServiceName(),
+            event.getInstanceId()).invoke(event, request.getStatus());
+      }
+    } catch (AlphaException ex) {
+      responseObserver.onNext(REJECT);
     }
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
@@ -63,7 +71,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
-    OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect();
+    OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId());
+    if (null != omegaCallback) {
+      omegaCallback.disconnect();
+    }
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 3c19cbb..369472c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -18,11 +18,10 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
-import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
 
-  void compensate(ParticipatedEvent event, TransactionStatus status);
+  void invoke(ParticipatedEvent event, String status);
 
   default void disconnect() {
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index ef075a5..834a5a2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -22,6 +22,7 @@ import static java.util.Collections.emptyMap;
 import io.grpc.stub.StreamObserver;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
@@ -32,6 +33,10 @@ public final class OmegaCallbacksRegistry {
 
   private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
 
+  public static Map<String, Map<String, OmegaCallback>> getRegistry() {
+    return REGISTRY;
+  }
+
   /**
    * Register omega TCC callback.
    *
@@ -50,9 +55,18 @@ public final class OmegaCallbacksRegistry {
    * @param serviceName service name
    * @param instanceId instance id
    * @return Grpc omega TCC callback
+   * @throws AlphaException trigger this exception while missing omega callback by service name
    */
-  public static OmegaCallback retrieve(String serviceName, String instanceId) {
-    return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+  public static OmegaCallback retrieve(String serviceName, String instanceId) throws AlphaException {
+    Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap());
+    if (callbackMap.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + serviceName);
+    }
+    OmegaCallback result = callbackMap.get(instanceId);
+    if (null == result) {
+      return callbackMap.values().iterator().next();
+    }
+    return result;
   }
 
   /**
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
index a2e3ddc..6218304 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 
@@ -28,7 +28,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
  */
 public final class TransactionEventRegistry {
 
-  private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
+  private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
 
   /**
    * Register participate event.
@@ -37,7 +37,7 @@ public final class TransactionEventRegistry {
    */
   public static void register(ParticipatedEvent participateEvent) {
     REGISTRY
-        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
+        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedHashSet<>())
         .add(participateEvent);
   }
 
@@ -47,7 +47,7 @@ public final class TransactionEventRegistry {
    * @param globalTxId global transaction id
    * @return participate events
    */
-  public static List<ParticipatedEvent> retrieve(String globalTxId) {
+  public static Set<ParticipatedEvent> retrieve(String globalTxId) {
     return REGISTRY.get(globalTxId);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
index 3c7523f..7964be5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -27,10 +27,10 @@ public class ParticipateEventFactory {
         request.getGlobalTxId(),
         request.getLocalTxId(),
         request.getParentTxId(),
-        request.getConfirmMethod(),
-        request.getCancelMethod(),
         request.getServiceName(),
         request.getInstanceId(),
+        request.getConfirmMethod(),
+        request.getCancelMethod(),
         TransactionStatus.valueOf(request.getStatus())
     );
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
index 67c84ac..40270c2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -46,63 +46,31 @@ public class ParticipatedEvent {
     return globalTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
-  }
-
   public String getLocalTxId() {
     return localTxId;
   }
 
-  public void setLocalTxId(String localTxId) {
-    this.localTxId = localTxId;
-  }
-
   public String getParentTxId() {
     return parentTxId;
   }
 
-  public void setParentTxId(String parentTxId) {
-    this.parentTxId = parentTxId;
-  }
-
-  public String getConfirmMethod() {
-    return confirmMethod;
-  }
-
-  public void setConfirmMethod(String confirmMethod) {
-    this.confirmMethod = confirmMethod;
-  }
-
-  public String getCancelMethod() {
-    return cancelMethod;
-  }
-
-  public void setCancelMethod(String cancelMethod) {
-    this.cancelMethod = cancelMethod;
-  }
-
   public String getServiceName() {
     return serviceName;
   }
 
-  public void setServiceName(String serviceName) {
-    this.serviceName = serviceName;
-  }
-
   public String getInstanceId() {
     return instanceId;
   }
 
-  public void setInstanceId(String instanceId) {
-    this.instanceId = instanceId;
+  public String getConfirmMethod() {
+    return confirmMethod;
   }
 
-  public TransactionStatus getStatus() {
-    return status;
+  public String getCancelMethod() {
+    return cancelMethod;
   }
 
-  public void setStatus(TransactionStatus status) {
-    this.status = status;
+  public TransactionStatus getStatus() {
+    return status;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 66e035b..1af6b05 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -84,7 +84,8 @@ import io.grpc.stub.StreamObserver;
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8090",
-        "alpha.event.pollingInterval=1"
+        "alpha.event.pollingInterval=1",
+        "alpha.mode=SAGA"
        })
 public class AlphaIntegrationTest {
   private static final int port = 8090;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
index 8a2df82..e14775c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
@@ -39,7 +39,8 @@ import io.netty.handler.ssl.SslProvider;
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8092",
-        "alpha.event.pollingInterval=1"
+        "alpha.event.pollingInterval=1",
+        "alpha.mode=SAGA"
     })
 public class AlphaIntegrationWithSSLTest extends AlphaIntegrationTest {
   private static final int port = 8092;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index d421075..102141e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -18,6 +18,11 @@
 package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
@@ -25,10 +30,17 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.TransactionEventRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
@@ -45,20 +57,19 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {AlphaApplication.class},
     properties = {
         "alpha.server.host=0.0.0.0",
-        "alpha.server.tcc-port=8090",
-        "alpha.event.pollingInterval=1",
+        "alpha.server.tcc-port=8190",
         "alpha.mode=TCC"
     })
 public class AlphaTccServerTest {
 
-  private static final int port = 8090;
+  private static final int port = 8190;
   protected static ManagedChannel clientChannel;
 
   private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
 
   private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
 
-  private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+  private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
   private final TccCoordinateCommandStreamObserver commandStreamObserver =
       new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
@@ -66,7 +77,9 @@ public class AlphaTccServerTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
-  private final String compensationMethod = getClass().getCanonicalName();
+  private final String confirmMethod = "confirm";
+  private final String cancelMethod = "cancel";
+
 
   private final String serviceName = uniquify("serviceName");
   private final String instanceId = uniquify("instanceId");
@@ -94,12 +107,95 @@ public class AlphaTccServerTest {
 
   @After
   public void after() {
-//    blockingStub.onDisconnected(serviceConfig);
+    blockingStub.onDisconnected(serviceConfig);
   }
 
   @Test
   public void assertOnConnect() {
     asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    assertThat(
+        OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class))
+    );
+  }
+
+  private void awaitUntilConnected() {
+    await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName)));
+  }
+
+  @Test
+  public void assertOnParticipated() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    assertThat(TransactionEventRegistry.retrieve(globalTxId).size(),  is(1));
+    ParticipatedEvent event = TransactionEventRegistry.retrieve(globalTxId).iterator().next();
+    assertThat(event.getGlobalTxId(), is(globalTxId));
+    assertThat(event.getLocalTxId(), is(localTxId));
+    assertThat(event.getInstanceId(), is(instanceId));
+    assertThat(event.getServiceName(), is(serviceName));
+    assertThat(event.getConfirmMethod(), is(confirmMethod));
+    assertThat(event.getCancelMethod(), is(cancelMethod));
+    assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+  }
+
+  @Test
+  public void assertOnTccTransactionSucceedEnded() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.size(), is(1));
+    GrpcTccCoordinateCommand command = receivedCommands.poll();
+    assertThat(command.getMethod(), is("confirm"));
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getServiceName(), is(serviceName));
+  }
+
+  @Test
+  public void assertOnTccTransactionFailedEnded() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    blockingStub.onTccTransactionEnded(newTxEnd("Failed"));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.size(), is(1));
+    GrpcTccCoordinateCommand command = receivedCommands.poll();
+    assertThat(command.getMethod(), is("cancel"));
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getServiceName(), is(serviceName));
+  }
+
+  private GrpcTccParticipatedEvent newParticipatedEvent(String status) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setCancelMethod(cancelMethod)
+        .setConfirmMethod(confirmMethod)
+        .setStatus(status)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent newTxStart() {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent newTxEnd(String status) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setStatus(status)
+        .build();
   }
 
   private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
similarity index 85%
rename from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
rename to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
index cc39a8c..f97c5d8 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
+package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import io.grpc.stub.StreamObserver;
 import java.util.Queue;
@@ -24,14 +24,14 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
 
-  private static  Queue<GrpcTccCoordinateCommand> receivedCommands;
-  private  Consumer<GrpcTccCoordinateCommand> consumer;
+  private Queue<GrpcTccCoordinateCommand> receivedCommands;
+  private Consumer<GrpcTccCoordinateCommand> consumer;
   private boolean completed = false;
 
   public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
       Queue<GrpcTccCoordinateCommand> receivedCommands) {
     this.consumer = consumer;
-    TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
+    this.receivedCommands = receivedCommands;
   }
 
   @Override
@@ -42,7 +42,6 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc
 
   @Override
   public void onError(Throwable t) {
-
   }
 
   @Override