You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/28 06:40:22 UTC

[incubator-servicecomb-saga] 08/14: SCB-856 Add junit test case & resolve related bugs.

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 3f7377693c768f563cac5f1ff8211f5bc05057d3
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Aug 24 17:53:26 2018 +0800

    SCB-856 Add junit test case & resolve related bugs.
---
 .../alpha/server/tcc/GrpcOmegaTccCallback.java     |  10 +-
 .../saga/alpha/server/tcc/GrpcTccEventService.java |  19 +++-
 .../saga/alpha/server/tcc/OmegaCallback.java       |   3 +-
 .../alpha/server/tcc/OmegaCallbacksRegistry.java   |  18 +++-
 .../alpha/server/tcc/TransactionEventRegistry.java |  10 +-
 .../server/tcc/event/ParticipateEventFactory.java  |   4 +-
 .../alpha/server/tcc/event/ParticipatedEvent.java  |  44 ++-------
 .../saga/alpha/server/AlphaIntegrationTest.java    |   3 +-
 .../alpha/server/AlphaIntegrationWithSSLTest.java  |   3 +-
 .../saga/alpha/tcc/server/AlphaTccServerTest.java  | 110 +++++++++++++++++++--
 .../TccCoordinateCommandStreamObserver.java        |   9 +-
 11 files changed, 164 insertions(+), 69 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index e5364b0..8ea7cfb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -34,13 +34,19 @@ public final class GrpcOmegaTccCallback implements OmegaCallback {
   }
 
   @Override
-  public void compensate(ParticipatedEvent event, TransactionStatus status) {
+  public void invoke(ParticipatedEvent event, String status) {
     GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
         .setGlobalTxId(event.getGlobalTxId())
         .setLocalTxId(event.getLocalTxId())
         .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
-        .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+        .setServiceName(event.getServiceName())
+        .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
         .build();
     responseObserver.onNext(command);
   }
+
+  @Override
+  public void disconnect() {
+    responseObserver.onCompleted();
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index 148a0e9..cd61162 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,8 +18,9 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -43,6 +44,8 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+    responseObserver.onNext(ALLOW);
+    responseObserver.onCompleted();
   }
 
   @Override
@@ -54,8 +57,13 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
-    for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
-      OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
+    try {
+      for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+        OmegaCallbacksRegistry.retrieve(event.getServiceName(),
+            event.getInstanceId()).invoke(event, request.getStatus());
+      }
+    } catch (AlphaException ex) {
+      responseObserver.onNext(REJECT);
     }
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
@@ -63,7 +71,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
 
   @Override
   public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
-    OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect();
+    OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId());
+    if (null != omegaCallback) {
+      omegaCallback.disconnect();
+    }
     responseObserver.onNext(ALLOW);
     responseObserver.onCompleted();
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 3c19cbb..369472c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -18,11 +18,10 @@
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
-import org.apache.servicecomb.saga.common.TransactionStatus;
 
 public interface OmegaCallback {
 
-  void compensate(ParticipatedEvent event, TransactionStatus status);
+  void invoke(ParticipatedEvent event, String status);
 
   default void disconnect() {
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index ef075a5..834a5a2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -22,6 +22,7 @@ import static java.util.Collections.emptyMap;
 import io.grpc.stub.StreamObserver;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
@@ -32,6 +33,10 @@ public final class OmegaCallbacksRegistry {
 
   private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
 
+  public static Map<String, Map<String, OmegaCallback>> getRegistry() {
+    return REGISTRY;
+  }
+
   /**
    * Register omega TCC callback.
    *
@@ -50,9 +55,18 @@ public final class OmegaCallbacksRegistry {
    * @param serviceName service name
    * @param instanceId instance id
    * @return Grpc omega TCC callback
+   * @throws AlphaException trigger this exception while missing omega callback by service name
    */
-  public static OmegaCallback retrieve(String serviceName, String instanceId) {
-    return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+  public static OmegaCallback retrieve(String serviceName, String instanceId) throws AlphaException {
+    Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap());
+    if (callbackMap.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + serviceName);
+    }
+    OmegaCallback result = callbackMap.get(instanceId);
+    if (null == result) {
+      return callbackMap.values().iterator().next();
+    }
+    return result;
   }
 
   /**
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
index a2e3ddc..6218304 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
@@ -17,9 +17,9 @@
 
 package org.apache.servicecomb.saga.alpha.server.tcc;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
 
@@ -28,7 +28,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
  */
 public final class TransactionEventRegistry {
 
-  private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
+  private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
 
   /**
    * Register participate event.
@@ -37,7 +37,7 @@ public final class TransactionEventRegistry {
    */
   public static void register(ParticipatedEvent participateEvent) {
     REGISTRY
-        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
+        .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedHashSet<>())
         .add(participateEvent);
   }
 
@@ -47,7 +47,7 @@ public final class TransactionEventRegistry {
    * @param globalTxId global transaction id
    * @return participate events
    */
-  public static List<ParticipatedEvent> retrieve(String globalTxId) {
+  public static Set<ParticipatedEvent> retrieve(String globalTxId) {
     return REGISTRY.get(globalTxId);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
index 3c7523f..7964be5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -27,10 +27,10 @@ public class ParticipateEventFactory {
         request.getGlobalTxId(),
         request.getLocalTxId(),
         request.getParentTxId(),
-        request.getConfirmMethod(),
-        request.getCancelMethod(),
         request.getServiceName(),
         request.getInstanceId(),
+        request.getConfirmMethod(),
+        request.getCancelMethod(),
         TransactionStatus.valueOf(request.getStatus())
     );
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
index 67c84ac..40270c2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -46,63 +46,31 @@ public class ParticipatedEvent {
     return globalTxId;
   }
 
-  public void setGlobalTxId(String globalTxId) {
-    this.globalTxId = globalTxId;
-  }
-
   public String getLocalTxId() {
     return localTxId;
   }
 
-  public void setLocalTxId(String localTxId) {
-    this.localTxId = localTxId;
-  }
-
   public String getParentTxId() {
     return parentTxId;
   }
 
-  public void setParentTxId(String parentTxId) {
-    this.parentTxId = parentTxId;
-  }
-
-  public String getConfirmMethod() {
-    return confirmMethod;
-  }
-
-  public void setConfirmMethod(String confirmMethod) {
-    this.confirmMethod = confirmMethod;
-  }
-
-  public String getCancelMethod() {
-    return cancelMethod;
-  }
-
-  public void setCancelMethod(String cancelMethod) {
-    this.cancelMethod = cancelMethod;
-  }
-
   public String getServiceName() {
     return serviceName;
   }
 
-  public void setServiceName(String serviceName) {
-    this.serviceName = serviceName;
-  }
-
   public String getInstanceId() {
     return instanceId;
   }
 
-  public void setInstanceId(String instanceId) {
-    this.instanceId = instanceId;
+  public String getConfirmMethod() {
+    return confirmMethod;
   }
 
-  public TransactionStatus getStatus() {
-    return status;
+  public String getCancelMethod() {
+    return cancelMethod;
   }
 
-  public void setStatus(TransactionStatus status) {
-    this.status = status;
+  public TransactionStatus getStatus() {
+    return status;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 66e035b..1af6b05 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -84,7 +84,8 @@ import io.grpc.stub.StreamObserver;
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8090",
-        "alpha.event.pollingInterval=1"
+        "alpha.event.pollingInterval=1",
+        "alpha.mode=SAGA"
        })
 public class AlphaIntegrationTest {
   private static final int port = 8090;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
index 8a2df82..e14775c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
@@ -39,7 +39,8 @@ import io.netty.handler.ssl.SslProvider;
     properties = {
         "alpha.server.host=0.0.0.0",
         "alpha.server.port=8092",
-        "alpha.event.pollingInterval=1"
+        "alpha.event.pollingInterval=1",
+        "alpha.mode=SAGA"
     })
 public class AlphaIntegrationWithSSLTest extends AlphaIntegrationTest {
   private static final int port = 8092;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index d421075..102141e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -18,6 +18,11 @@
 package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
@@ -25,10 +30,17 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.TransactionEventRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
 import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
@@ -45,20 +57,19 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = {AlphaApplication.class},
     properties = {
         "alpha.server.host=0.0.0.0",
-        "alpha.server.tcc-port=8090",
-        "alpha.event.pollingInterval=1",
+        "alpha.server.tcc-port=8190",
         "alpha.mode=TCC"
     })
 public class AlphaTccServerTest {
 
-  private static final int port = 8090;
+  private static final int port = 8190;
   protected static ManagedChannel clientChannel;
 
   private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
 
   private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
 
-  private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+  private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
   private final TccCoordinateCommandStreamObserver commandStreamObserver =
       new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
@@ -66,7 +77,9 @@ public class AlphaTccServerTest {
   private final String globalTxId = UUID.randomUUID().toString();
   private final String localTxId = UUID.randomUUID().toString();
   private final String parentTxId = UUID.randomUUID().toString();
-  private final String compensationMethod = getClass().getCanonicalName();
+  private final String confirmMethod = "confirm";
+  private final String cancelMethod = "cancel";
+
 
   private final String serviceName = uniquify("serviceName");
   private final String instanceId = uniquify("instanceId");
@@ -94,12 +107,95 @@ public class AlphaTccServerTest {
 
   @After
   public void after() {
-//    blockingStub.onDisconnected(serviceConfig);
+    blockingStub.onDisconnected(serviceConfig);
   }
 
   @Test
   public void assertOnConnect() {
     asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    assertThat(
+        OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class))
+    );
+  }
+
+  private void awaitUntilConnected() {
+    await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName)));
+  }
+
+  @Test
+  public void assertOnParticipated() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    assertThat(TransactionEventRegistry.retrieve(globalTxId).size(),  is(1));
+    ParticipatedEvent event = TransactionEventRegistry.retrieve(globalTxId).iterator().next();
+    assertThat(event.getGlobalTxId(), is(globalTxId));
+    assertThat(event.getLocalTxId(), is(localTxId));
+    assertThat(event.getInstanceId(), is(instanceId));
+    assertThat(event.getServiceName(), is(serviceName));
+    assertThat(event.getConfirmMethod(), is(confirmMethod));
+    assertThat(event.getCancelMethod(), is(cancelMethod));
+    assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+  }
+
+  @Test
+  public void assertOnTccTransactionSucceedEnded() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.size(), is(1));
+    GrpcTccCoordinateCommand command = receivedCommands.poll();
+    assertThat(command.getMethod(), is("confirm"));
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getServiceName(), is(serviceName));
+  }
+
+  @Test
+  public void assertOnTccTransactionFailedEnded() {
+    asyncStub.onConnected(serviceConfig, commandStreamObserver);
+    awaitUntilConnected();
+    blockingStub.onTccTransactionStarted(newTxStart());
+    blockingStub.participate(newParticipatedEvent("Succeed"));
+    blockingStub.onTccTransactionEnded(newTxEnd("Failed"));
+
+    await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.size(), is(1));
+    GrpcTccCoordinateCommand command = receivedCommands.poll();
+    assertThat(command.getMethod(), is("cancel"));
+    assertThat(command.getGlobalTxId(), is(globalTxId));
+    assertThat(command.getServiceName(), is(serviceName));
+  }
+
+  private GrpcTccParticipatedEvent newParticipatedEvent(String status) {
+    return GrpcTccParticipatedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setCancelMethod(cancelMethod)
+        .setConfirmMethod(confirmMethod)
+        .setStatus(status)
+        .build();
+  }
+
+  private GrpcTccTransactionStartedEvent newTxStart() {
+    return GrpcTccTransactionStartedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .build();
+  }
+
+  private GrpcTccTransactionEndedEvent newTxEnd(String status) {
+    return GrpcTccTransactionEndedEvent.newBuilder()
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setStatus(status)
+        .build();
   }
 
   private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
similarity index 85%
rename from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
rename to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
index cc39a8c..f97c5d8 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
+package org.apache.servicecomb.saga.alpha.tcc.server;
 
 import io.grpc.stub.StreamObserver;
 import java.util.Queue;
@@ -24,14 +24,14 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
 
 public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
 
-  private static  Queue<GrpcTccCoordinateCommand> receivedCommands;
-  private  Consumer<GrpcTccCoordinateCommand> consumer;
+  private Queue<GrpcTccCoordinateCommand> receivedCommands;
+  private Consumer<GrpcTccCoordinateCommand> consumer;
   private boolean completed = false;
 
   public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
       Queue<GrpcTccCoordinateCommand> receivedCommands) {
     this.consumer = consumer;
-    TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
+    this.receivedCommands = receivedCommands;
   }
 
   @Override
@@ -42,7 +42,6 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc
 
   @Override
   public void onError(Throwable t) {
-
   }
 
   @Override