You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/08/23 09:52:50 UTC

[GitHub] cherrylzhao closed pull request #265: SCB-865 Implement reaction of the event in Alpha Server

cherrylzhao closed pull request #265: SCB-865 Implement reaction of the event in Alpha Server
URL: https://github.com/apache/incubator-servicecomb-saga/pull/265
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e45acdd4..6e31628f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,10 +23,8 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-
 import org.apache.servicecomb.saga.alpha.core.CommandRepository;
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.EventScanner;
@@ -37,12 +35,14 @@
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
 import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
 @Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
   private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@@ -86,13 +86,11 @@ ScheduledExecutorService compensationScheduler() {
   @Bean
   TxConsistentService txConsistentService(
       @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
-      GrpcServerConfig serverConfig,
       ScheduledExecutorService scheduler,
       TxEventRepository eventRepository,
       CommandRepository commandRepository,
       TxTimeoutRepository timeoutRepository,
-      OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      OmegaCallback omegaCallback) {
 
     new EventScanner(scheduler,
         eventRepository, commandRepository, timeoutRepository,
@@ -100,16 +98,16 @@ TxConsistentService txConsistentService(
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository);
 
-    ServerStartable startable = buildGrpc(serverConfig, consistentService, omegaCallbacks);
-    new Thread(startable::start).start();
-
     return consistentService;
   }
 
-  private ServerStartable buildGrpc(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
+  @Bean
+  ServerStartable sagaServerBootstrap(SagaGrpcServerConfig serverConfig, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    return new GrpcStartable(serverConfig,
+    ServerStartable bootstrap = new GrpcStartable(serverConfig,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+    new Thread(bootstrap::start).start();
+    return bootstrap;
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
index 66dd9925..bb4c880f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
@@ -17,10 +17,8 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import org.springframework.context.annotation.Configuration;
 import org.springframework.beans.factory.annotation.Value;
 
-@Configuration
 public class GrpcServerConfig {
   @Value("${alpha.server.host:0.0.0.0}")
   private String host;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 4d993748..a5999672 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -41,12 +41,12 @@
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 
-class GrpcStartable implements ServerStartable {
+public class GrpcStartable implements ServerStartable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Server server;
 
-  GrpcStartable(GrpcServerConfig serverConfig, BindableService... services) {
+  public GrpcStartable(GrpcServerConfig serverConfig, BindableService... services) {
     ServerBuilder<?> serverBuilder;
     if (serverConfig.isSslEnable()){
       serverBuilder = NettyServerBuilder.forAddress(
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
new file mode 100644
index 00000000..fd86a91d
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SagaGrpcServerConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('SAGA')")
+public class SagaGrpcServerConfig extends GrpcServerConfig {
+}
+
+
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
index 33d39df9..41dfdbdd 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
@@ -20,6 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-interface ServerStartable {
+public interface ServerStartable {
   void start();
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
new file mode 100644
index 00000000..8dabf79c
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/AlphaTccConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.apache.servicecomb.saga.alpha.server.GrpcStartable;
+import org.apache.servicecomb.saga.alpha.server.ServerStartable;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.boot.autoconfigure.domain.EntityScan;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
+//@ConditionalOnProperty(value ="alpha.mode.TCC")
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
+public class AlphaTccConfig {
+
+  @Bean
+  ServerStartable tccServerBootstrap(TccGrpcServerConfig serverConfig) {
+    ServerStartable bootstrap = new GrpcStartable(serverConfig, new GrpcTccEventService());
+    new Thread(bootstrap::start).start();
+    return bootstrap;
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
new file mode 100644
index 00000000..e5364b03
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+
+/**
+ * Grpc omega callback for TCC workflow.
+ */
+public final class GrpcOmegaTccCallback implements OmegaCallback {
+
+  private StreamObserver<GrpcTccCoordinateCommand> responseObserver;
+
+  public GrpcOmegaTccCallback(StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void compensate(ParticipatedEvent event, TransactionStatus status) {
+    GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
+        .setGlobalTxId(event.getGlobalTxId())
+        .setLocalTxId(event.getLocalTxId())
+        .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
+        .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+        .build();
+    responseObserver.onNext(command);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
new file mode 100644
index 00000000..148a0e93
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+
+/**
+ * Grpc TCC event service implement.
+ */
+public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImplBase {
+
+  private static final GrpcAck ALLOW = GrpcAck.newBuilder().setAborted(false).build();
+  private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
+
+  @Override
+  public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+    OmegaCallbacksRegistry.register(request, responseObserver);
+  }
+
+  @Override
+  public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+  }
+
+  @Override
+  public void participate(GrpcTccParticipatedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    TransactionEventRegistry.register(ParticipateEventFactory.create(request));
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+      OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
+    }
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+    OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect();
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
new file mode 100644
index 00000000..3c19cbba
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public interface OmegaCallback {
+
+  void compensate(ParticipatedEvent event, TransactionStatus status);
+
+  default void disconnect() {
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
new file mode 100644
index 00000000..ef075a57
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import static java.util.Collections.emptyMap;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+
+/**
+ * Manage omega callbacks.
+ */
+public final class OmegaCallbacksRegistry {
+
+  private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
+
+  /**
+   * Register omega TCC callback.
+   *
+   * @param request Grpc service config
+   * @param responseObserver stream observer
+   */
+  public static void register(GrpcServiceConfig request, StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+    REGISTRY
+        .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>())
+        .put(request.getInstanceId(), new GrpcOmegaTccCallback(responseObserver));
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieve(String serviceName, String instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+  }
+
+  /**
+   * Retrieve omega TCC callback by service name and instance id, then remove it from registry.
+   *
+   * @param serviceName service name
+   * @param instanceId instance id
+   * @return Grpc omega TCC callback
+   */
+  public static OmegaCallback retrieveThenRemove(String serviceName, String instanceId) {
+    return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId);
+  }
+
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
new file mode 100644
index 00000000..20ab0636
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TccGrpcServerConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import org.apache.servicecomb.saga.alpha.server.GrpcServerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnExpression("'${alpha.mode:SAGA}'.contains('TCC')")
+public class TccGrpcServerConfig extends GrpcServerConfig {
+
+  @Value("${alpha.server.tcc-port:8080}")
+  private int port;
+
+  public int getPort() {
+    return port;
+  }
+}
+
+
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
new file mode 100644
index 00000000..a2e3ddc0
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+
+/**
+ * Manage TCC transaction event.
+ */
+public final class TransactionEventRegistry {
+
+  private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
+
+  /**
+   * Register participate event.
+   *
+   * @param participateEvent participate event
+   */
+  public static void register(ParticipatedEvent participateEvent) {
+    REGISTRY
+        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
+        .add(participateEvent);
+  }
+
+  /**
+   * Retrieve participate event from registry.
+   *
+   * @param globalTxId global transaction id
+   * @return participate events
+   */
+  public static List<ParticipatedEvent> retrieve(String globalTxId) {
+    return REGISTRY.get(globalTxId);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
new file mode 100644
index 00000000..3c7523f9
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+
+public class ParticipateEventFactory {
+
+  public static ParticipatedEvent create(GrpcTccParticipatedEvent request) {
+    return new ParticipatedEvent(
+        request.getGlobalTxId(),
+        request.getLocalTxId(),
+        request.getParentTxId(),
+        request.getConfirmMethod(),
+        request.getCancelMethod(),
+        request.getServiceName(),
+        request.getInstanceId(),
+        TransactionStatus.valueOf(request.getStatus())
+    );
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
new file mode 100644
index 00000000..67c84ac2
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class ParticipatedEvent {
+
+  private String globalTxId;
+  private String localTxId;
+  private String parentTxId;
+  private String serviceName;
+  private String instanceId;
+  private String confirmMethod;
+  private String cancelMethod;
+  private TransactionStatus status;
+
+  public ParticipatedEvent(String globalTxId, String localTxId, String parentTxId, String serviceName,
+      String instanceId, String confirmMethod, String cancelMethod, TransactionStatus status) {
+    this.globalTxId = globalTxId;
+    this.localTxId = localTxId;
+    this.parentTxId = parentTxId;
+    this.serviceName = serviceName;
+    this.instanceId = instanceId;
+    this.confirmMethod = confirmMethod;
+    this.cancelMethod = cancelMethod;
+    this.status = status;
+  }
+
+  public String getGlobalTxId() {
+    return globalTxId;
+  }
+
+  public void setGlobalTxId(String globalTxId) {
+    this.globalTxId = globalTxId;
+  }
+
+  public String getLocalTxId() {
+    return localTxId;
+  }
+
+  public void setLocalTxId(String localTxId) {
+    this.localTxId = localTxId;
+  }
+
+  public String getParentTxId() {
+    return parentTxId;
+  }
+
+  public void setParentTxId(String parentTxId) {
+    this.parentTxId = parentTxId;
+  }
+
+  public String getConfirmMethod() {
+    return confirmMethod;
+  }
+
+  public void setConfirmMethod(String confirmMethod) {
+    this.confirmMethod = confirmMethod;
+  }
+
+  public String getCancelMethod() {
+    return cancelMethod;
+  }
+
+  public void setCancelMethod(String cancelMethod) {
+    this.cancelMethod = cancelMethod;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public TransactionStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TransactionStatus status) {
+    this.status = status;
+  }
+}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index b8cd1189..92009ace 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,8 @@ alpha:
   server:
     host: 0.0.0.0
     port: 8080
+    tcc-port: 8180
+  mode: SAGA,TCC
 
 ---
 spring:
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
new file mode 100644
index 00000000..517729a0
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.annotation.PostConstruct;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.AlphaTccApplication;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.Bootstrap;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcBootstrap;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcTccServerConfig;
+import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaTccApplication.class},
+    properties = {
+        "alpha.server.host=0.0.0.0",
+        "alpha.server.port=8098",
+        "alpha.event.pollingInterval=1"
+    })
+public class AlphaTccServerTest {
+
+  @Autowired
+  private GrpcTccServerConfig grpcTccServerConfig;
+
+  private static GrpcTccServerConfig serverConfig;
+  @PostConstruct
+  public void init() {
+    serverConfig = grpcTccServerConfig;
+    server = new GrpcBootstrap(serverConfig, new GrpcTccEventService());
+    new Thread(server::start).start();
+  }
+
+  private static final int port = 8090;
+  private  static Bootstrap server;
+  protected static ManagedChannel clientChannel;
+
+  private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
+
+  private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
+
+  private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+
+  private final TccCoordinateCommandStreamObserver commandStreamObserver =
+      new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
+
+  private final String globalTxId = UUID.randomUUID().toString();
+  private final String localTxId = UUID.randomUUID().toString();
+  private final String parentTxId = UUID.randomUUID().toString();
+  private final String compensationMethod = getClass().getCanonicalName();
+
+  private final String serviceName = uniquify("serviceName");
+  private final String instanceId = uniquify("instanceId");
+
+  private final GrpcServiceConfig serviceConfig = GrpcServiceConfig.newBuilder()
+      .setServiceName(serviceName)
+      .setInstanceId(instanceId)
+      .build();
+
+  @BeforeClass
+  public static void setupClientChannel() {
+    clientChannel = NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    clientChannel.shutdown();
+    clientChannel = null;
+  }
+
+  @Before
+  public void before() {
+    System.out.println(" globalTxId " + globalTxId);
+  }
+
+  @After
+  public void after() {
+//    blockingStub.onDisconnected(serviceConfig);
+  }
+
+  @Test
+  public void assertOnConnect() {
+//    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+  }
+
+  private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
+    return GrpcAck.newBuilder().setAborted(false).build();
+  }
+
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
new file mode 100644
index 00000000..464dffc4
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class AlphaTccApplication {
+  public static void main(String[] args) {
+    SpringApplication.run(AlphaTccApplication.class, args);
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
new file mode 100644
index 00000000..6382378c
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+public interface Bootstrap {
+
+  void start();
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
new file mode 100644
index 00000000..8021161b
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Properties;
+import javax.net.ssl.SSLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcBootstrap implements Bootstrap {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final Server server;
+
+  public GrpcBootstrap(GrpcTccServerConfig serverConfig, BindableService... services) {
+    ServerBuilder<?> serverBuilder;
+    if (serverConfig.isSslEnable()) {
+      serverBuilder = NettyServerBuilder.forAddress(
+          new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()));
+
+      try {
+        ((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
+      } catch (SSLException e) {
+        throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
+      }
+    } else {
+      serverBuilder = ServerBuilder.forPort(serverConfig.getPort());
+    }
+    Arrays.stream(services).forEach(serverBuilder::addService);
+    server = serverBuilder.build();
+  }
+
+  @Override
+  public void start() {
+    Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
+
+    try {
+      server.start();
+      server.awaitTermination();
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to start grpc server.", e);
+    } catch (InterruptedException e) {
+      LOG.error("grpc server was interrupted.", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private SslContextBuilder getSslContextBuilder(GrpcTccServerConfig config) {
+
+    Properties prop = new Properties();
+    ClassLoader classLoader = getClass().getClassLoader();
+    try {
+      prop.load(classLoader.getResourceAsStream("ssl.properties"));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to read ssl.properties.", e);
+    }
+
+    InputStream cert = getInputStream(classLoader, config.getCert(), "Server Cert");
+    InputStream key = getInputStream(classLoader, config.getKey(), "Server Key");
+
+    SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(cert, key)
+        .protocols(prop.getProperty("protocols"))
+        .ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
+    if (config.isMutualAuth()) {
+      InputStream clientCert = getInputStream(classLoader, config.getClientCert(), "Client Cert");
+      sslClientContextBuilder.trustManager(clientCert);
+      sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
+    }
+    return GrpcSslContexts.configure(sslClientContextBuilder,
+        SslProvider.OPENSSL);
+  }
+
+  private InputStream getInputStream(ClassLoader classLoader, String resource, String config) {
+    InputStream is = classLoader.getResourceAsStream(resource);
+    if (is == null) {
+      throw new IllegalStateException("Cannot load the " + config + " from " + resource);
+    }
+    return is;
+
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
new file mode 100644
index 00000000..f40076cc
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class GrpcTccServerConfig {
+  @Value("${alpha.server.host:0.0.0.0}")
+  private String host;
+
+  @Value("${alpha.server.port:8080}")
+  private int port;
+
+  @Value("${alpha.server.ssl.enable:false}")
+  private boolean sslEnable;
+
+  @Value("${alpha.server.ssl.cert:server.crt}")
+  private String cert;
+
+  @Value("${alpha.server.ssl.key:server.pem}")
+  private String key;
+
+  @Value("${alpha.server.ssl.mutualAuth:false}")
+  private boolean mutualAuth;
+
+  @Value("${alpha.server.ssl.clientCert:client.crt}")
+  private String clientCert;
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public boolean isSslEnable() {
+    return sslEnable;
+  }
+
+  public String getCert() {
+    return cert;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public boolean isMutualAuth() {
+    return mutualAuth;
+  }
+
+  public String getClientCert() {
+    return clientCert;
+  }
+}
+
+
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
new file mode 100644
index 00000000..cc39a8c6
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.common;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+
+public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
+
+  private static  Queue<GrpcTccCoordinateCommand> receivedCommands;
+  private  Consumer<GrpcTccCoordinateCommand> consumer;
+  private boolean completed = false;
+
+  public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
+      Queue<GrpcTccCoordinateCommand> receivedCommands) {
+    this.consumer = consumer;
+    TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
+  }
+
+  @Override
+  public void onNext(GrpcTccCoordinateCommand value) {
+    consumer.accept(value);
+    receivedCommands.add(value);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+
+  }
+
+  @Override
+  public void onCompleted() {
+    completed = true;
+  }
+}
diff --git a/saga-core/src/main/java/org/apache/servicecomb/saga/transports/TransportFactory.java b/saga-core/src/main/java/org/apache/servicecomb/saga/transports/TransportFactory.java
index b34b5bdf..053149fa 100644
--- a/saga-core/src/main/java/org/apache/servicecomb/saga/transports/TransportFactory.java
+++ b/saga-core/src/main/java/org/apache/servicecomb/saga/transports/TransportFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.servicecomb.saga.transports;
 
-public interface TransportFactory {
+import org.apache.servicecomb.saga.core.Transport;
 
-  RestTransport restTransport();
+public interface TransportFactory<T extends Transport> {
+
+  T getTransport();
 }
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
index a70aafbb..c6cbd476 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonFallback.java
@@ -20,6 +20,7 @@
 import org.apache.servicecomb.saga.core.Fallback;
 import org.apache.servicecomb.saga.core.Operation;
 import org.apache.servicecomb.saga.core.SagaResponse;
+import org.apache.servicecomb.saga.core.Transport;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -39,7 +40,7 @@
     @Type(value = JacksonRestFallback.class, name = Operation.TYPE_REST),
     @Type(value = NopJacksonFallback.class, name = Operation.TYPE_NOP)
 })
-public interface JacksonFallback extends Fallback, TransportAware {
+public interface JacksonFallback<T extends Transport> extends Fallback, TransportAware<T> {
 
   JacksonFallback NOP_TRANSPORT_AWARE_FALLBACK = new NopJacksonFallback(TYPE_NOP);
 
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestFallback.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestFallback.java
index 35c54fe3..92db35af 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestFallback.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestFallback.java
@@ -19,10 +19,12 @@
 
 import java.util.Map;
 
+import org.apache.servicecomb.saga.transports.RestTransport;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-class JacksonRestFallback extends JacksonRestOperation implements JacksonFallback {
+class JacksonRestFallback extends JacksonRestOperation implements JacksonFallback<RestTransport> {
 
   private final String type;
 
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestOperation.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestOperation.java
index c2921689..e9dd762a 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestOperation.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JacksonRestOperation.java
@@ -28,7 +28,7 @@
 import org.apache.servicecomb.saga.core.SagaResponse;
 import org.apache.servicecomb.saga.transports.RestTransport;
 
-class JacksonRestOperation extends RestOperation implements TransportAware {
+class JacksonRestOperation extends RestOperation implements TransportAware<RestTransport> {
 
   @JsonIgnore
   private RestTransport transport;
@@ -38,8 +38,8 @@
   }
 
   @Override
-  public JacksonRestOperation with(TransportFactory transport) {
-    this.transport = transport.restTransport();
+  public JacksonRestOperation with(TransportFactory<RestTransport> transport) {
+    this.transport = transport.getTransport();
     return this;
   }
 
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonRestSagaRequest.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonRestSagaRequest.java
index 9660eb4e..f91d29b3 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonRestSagaRequest.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonRestSagaRequest.java
@@ -20,6 +20,7 @@
 import static org.apache.servicecomb.saga.format.JacksonFallback.NOP_TRANSPORT_AWARE_FALLBACK;
 
 import org.apache.servicecomb.saga.core.SagaRequestImpl;
+import org.apache.servicecomb.saga.transports.RestTransport;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -27,7 +28,7 @@
 
 import org.apache.servicecomb.saga.core.Operation;
 
-public class JsonRestSagaRequest extends SagaRequestImpl implements JsonSagaRequest {
+public class JsonRestSagaRequest extends SagaRequestImpl implements JsonSagaRequest<RestTransport> {
 
   private final JacksonRestTransaction transaction;
   private final JacksonRestCompensation compensation;
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonSagaRequest.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonSagaRequest.java
index 45d39810..a25542f3 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonSagaRequest.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/JsonSagaRequest.java
@@ -19,6 +19,7 @@
 
 import org.apache.servicecomb.saga.core.Operation;
 import org.apache.servicecomb.saga.core.SagaRequest;
+import org.apache.servicecomb.saga.core.Transport;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -33,7 +34,7 @@
 @JsonSubTypes({
     @Type(value = JsonRestSagaRequest.class, name = Operation.TYPE_REST)
 })
-public interface JsonSagaRequest extends SagaRequest {
+public interface JsonSagaRequest<T extends Transport> extends SagaRequest {
 
   JsonSagaRequest with(TransportFactory transportFactory);
 }
diff --git a/saga-format/src/main/java/org/apache/servicecomb/saga/format/TransportAware.java b/saga-format/src/main/java/org/apache/servicecomb/saga/format/TransportAware.java
index edcabf7d..10b353ab 100644
--- a/saga-format/src/main/java/org/apache/servicecomb/saga/format/TransportAware.java
+++ b/saga-format/src/main/java/org/apache/servicecomb/saga/format/TransportAware.java
@@ -17,10 +17,11 @@
 
 package org.apache.servicecomb.saga.format;
 
+import org.apache.servicecomb.saga.core.Transport;
 import org.apache.servicecomb.saga.transports.TransportFactory;
 import org.apache.servicecomb.saga.core.Operation;
 
-interface TransportAware {
+interface TransportAware<T extends Transport> {
 
-  Operation with(TransportFactory transport);
+  Operation with(TransportFactory<T> transport);
 }
diff --git a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonFromJsonFormatTest.java b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonFromJsonFormatTest.java
index f2ae7242..aafe1051 100644
--- a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonFromJsonFormatTest.java
+++ b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonFromJsonFormatTest.java
@@ -191,7 +191,7 @@ public String apply(SagaRequest sagaRequest) {
 
   @Before
   public void setUp() throws Exception {
-    when(transportFactory.restTransport()).thenReturn(restTransport);
+    when(transportFactory.getTransport()).thenReturn(restTransport);
 
     when(restTransport.with("aaa", "/rest/as", "post", singletonMap("form", singletonMap("foo", "as"))))
         .thenReturn(response11);
diff --git a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonRestOperationTest.java b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonRestOperationTest.java
index 9241289e..91f380d7 100644
--- a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonRestOperationTest.java
+++ b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JacksonRestOperationTest.java
@@ -50,7 +50,7 @@
 
   @Before
   public void setUp() throws Exception {
-    when(transportFactory.restTransport()).thenReturn(transport);
+    when(transportFactory.getTransport()).thenReturn(transport);
     restOperation.with(transportFactory);
   }
 
diff --git a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JsonRestSagaRequestTest.java b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JsonRestSagaRequestTest.java
index 4ef870ce..fe105555 100644
--- a/saga-format/src/test/java/org/apache/servicecomb/saga/format/JsonRestSagaRequestTest.java
+++ b/saga-format/src/test/java/org/apache/servicecomb/saga/format/JsonRestSagaRequestTest.java
@@ -69,7 +69,7 @@ public void blowsUpIfCompensationIsNotSpecified() {
   @SuppressWarnings("unchecked")
   @Test
   public void defaultToNopFallbackIfNotSpecified() {
-    when(transportFactory.restTransport()).thenReturn(restTransport);
+    when(transportFactory.getTransport()).thenReturn(restTransport);
     JsonRestSagaRequest request = newSagaRequest(transaction, compensation, null);
 
     request.with(transportFactory);
diff --git a/saga-format/src/test/java/org/apache/servicecomb/saga/format/SagaEventFormatTest.java b/saga-format/src/test/java/org/apache/servicecomb/saga/format/SagaEventFormatTest.java
index fc509e46..ad6fb625 100644
--- a/saga-format/src/test/java/org/apache/servicecomb/saga/format/SagaEventFormatTest.java
+++ b/saga-format/src/test/java/org/apache/servicecomb/saga/format/SagaEventFormatTest.java
@@ -80,12 +80,12 @@
 
   @Before
   public void setUp() throws Exception {
-    when(transportFactory.restTransport()).thenReturn(restTransport);
+    when(transportFactory.getTransport()).thenReturn(restTransport);
   }
 
   @After
   public void tearDown() throws Exception {
-    verify(transportFactory, times(3)).restTransport();
+    verify(transportFactory, times(3)).getTransport();
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services