You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/28 06:40:22 UTC
[incubator-servicecomb-saga] 08/14: SCB-856 Add junit test case &
resolve related bugs.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 3f7377693c768f563cac5f1ff8211f5bc05057d3
Author: cherrylzhao <zh...@126.com>
AuthorDate: Fri Aug 24 17:53:26 2018 +0800
SCB-856 Add junit test case & resolve related bugs.
---
.../alpha/server/tcc/GrpcOmegaTccCallback.java | 10 +-
.../saga/alpha/server/tcc/GrpcTccEventService.java | 19 +++-
.../saga/alpha/server/tcc/OmegaCallback.java | 3 +-
.../alpha/server/tcc/OmegaCallbacksRegistry.java | 18 +++-
.../alpha/server/tcc/TransactionEventRegistry.java | 10 +-
.../server/tcc/event/ParticipateEventFactory.java | 4 +-
.../alpha/server/tcc/event/ParticipatedEvent.java | 44 ++-------
.../saga/alpha/server/AlphaIntegrationTest.java | 3 +-
.../alpha/server/AlphaIntegrationWithSSLTest.java | 3 +-
.../saga/alpha/tcc/server/AlphaTccServerTest.java | 110 +++++++++++++++++++--
.../TccCoordinateCommandStreamObserver.java | 9 +-
11 files changed, 164 insertions(+), 69 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
index e5364b0..8ea7cfb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java
@@ -34,13 +34,19 @@ public final class GrpcOmegaTccCallback implements OmegaCallback {
}
@Override
- public void compensate(ParticipatedEvent event, TransactionStatus status) {
+ public void invoke(ParticipatedEvent event, String status) {
GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
.setGlobalTxId(event.getGlobalTxId())
.setLocalTxId(event.getLocalTxId())
.setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId())
- .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
+ .setServiceName(event.getServiceName())
+ .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod())
.build();
responseObserver.onNext(command);
}
+
+ @Override
+ public void disconnect() {
+ responseObserver.onCompleted();
+ }
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
index 148a0e9..cd61162 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -18,8 +18,9 @@
package org.apache.servicecomb.saga.alpha.server.tcc;
import io.grpc.stub.StreamObserver;
-import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -43,6 +44,8 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
@Override
public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) {
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
}
@Override
@@ -54,8 +57,13 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
@Override
public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) {
- for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
- OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus());
+ try {
+ for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+ OmegaCallbacksRegistry.retrieve(event.getServiceName(),
+ event.getInstanceId()).invoke(event, request.getStatus());
+ }
+ } catch (AlphaException ex) {
+ responseObserver.onNext(REJECT);
}
responseObserver.onNext(ALLOW);
responseObserver.onCompleted();
@@ -63,7 +71,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl
@Override
public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
- OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect();
+ OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId());
+ if (null != omegaCallback) {
+ omegaCallback.disconnect();
+ }
responseObserver.onNext(ALLOW);
responseObserver.onCompleted();
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
index 3c19cbb..369472c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java
@@ -18,11 +18,10 @@
package org.apache.servicecomb.saga.alpha.server.tcc;
import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
-import org.apache.servicecomb.saga.common.TransactionStatus;
public interface OmegaCallback {
- void compensate(ParticipatedEvent event, TransactionStatus status);
+ void invoke(ParticipatedEvent event, String status);
default void disconnect() {
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
index ef075a5..834a5a2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java
@@ -22,6 +22,7 @@ import static java.util.Collections.emptyMap;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
@@ -32,6 +33,10 @@ public final class OmegaCallbacksRegistry {
private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>();
+ public static Map<String, Map<String, OmegaCallback>> getRegistry() {
+ return REGISTRY;
+ }
+
/**
* Register omega TCC callback.
*
@@ -50,9 +55,18 @@ public final class OmegaCallbacksRegistry {
* @param serviceName service name
* @param instanceId instance id
* @return Grpc omega TCC callback
+ * @throws AlphaException trigger this exception while missing omega callback by service name
*/
- public static OmegaCallback retrieve(String serviceName, String instanceId) {
- return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+ public static OmegaCallback retrieve(String serviceName, String instanceId) throws AlphaException {
+ Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap());
+ if (callbackMap.isEmpty()) {
+ throw new AlphaException("No such omega callback found for service " + serviceName);
+ }
+ OmegaCallback result = callbackMap.get(instanceId);
+ if (null == result) {
+ return callbackMap.values().iterator().next();
+ }
+ return result;
}
/**
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
index a2e3ddc..6218304 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java
@@ -17,9 +17,9 @@
package org.apache.servicecomb.saga.alpha.server.tcc;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
@@ -28,7 +28,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
*/
public final class TransactionEventRegistry {
- private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
+ private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>();
/**
* Register participate event.
@@ -37,7 +37,7 @@ public final class TransactionEventRegistry {
*/
public static void register(ParticipatedEvent participateEvent) {
REGISTRY
- .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>())
+ .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedHashSet<>())
.add(participateEvent);
}
@@ -47,7 +47,7 @@ public final class TransactionEventRegistry {
* @param globalTxId global transaction id
* @return participate events
*/
- public static List<ParticipatedEvent> retrieve(String globalTxId) {
+ public static Set<ParticipatedEvent> retrieve(String globalTxId) {
return REGISTRY.get(globalTxId);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
index 3c7523f..7964be5 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -27,10 +27,10 @@ public class ParticipateEventFactory {
request.getGlobalTxId(),
request.getLocalTxId(),
request.getParentTxId(),
- request.getConfirmMethod(),
- request.getCancelMethod(),
request.getServiceName(),
request.getInstanceId(),
+ request.getConfirmMethod(),
+ request.getCancelMethod(),
TransactionStatus.valueOf(request.getStatus())
);
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
index 67c84ac..40270c2 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -46,63 +46,31 @@ public class ParticipatedEvent {
return globalTxId;
}
- public void setGlobalTxId(String globalTxId) {
- this.globalTxId = globalTxId;
- }
-
public String getLocalTxId() {
return localTxId;
}
- public void setLocalTxId(String localTxId) {
- this.localTxId = localTxId;
- }
-
public String getParentTxId() {
return parentTxId;
}
- public void setParentTxId(String parentTxId) {
- this.parentTxId = parentTxId;
- }
-
- public String getConfirmMethod() {
- return confirmMethod;
- }
-
- public void setConfirmMethod(String confirmMethod) {
- this.confirmMethod = confirmMethod;
- }
-
- public String getCancelMethod() {
- return cancelMethod;
- }
-
- public void setCancelMethod(String cancelMethod) {
- this.cancelMethod = cancelMethod;
- }
-
public String getServiceName() {
return serviceName;
}
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
public String getInstanceId() {
return instanceId;
}
- public void setInstanceId(String instanceId) {
- this.instanceId = instanceId;
+ public String getConfirmMethod() {
+ return confirmMethod;
}
- public TransactionStatus getStatus() {
- return status;
+ public String getCancelMethod() {
+ return cancelMethod;
}
- public void setStatus(TransactionStatus status) {
- this.status = status;
+ public TransactionStatus getStatus() {
+ return status;
}
}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 66e035b..1af6b05 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -84,7 +84,8 @@ import io.grpc.stub.StreamObserver;
properties = {
"alpha.server.host=0.0.0.0",
"alpha.server.port=8090",
- "alpha.event.pollingInterval=1"
+ "alpha.event.pollingInterval=1",
+ "alpha.mode=SAGA"
})
public class AlphaIntegrationTest {
private static final int port = 8090;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
index 8a2df82..e14775c 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java
@@ -39,7 +39,8 @@ import io.netty.handler.ssl.SslProvider;
properties = {
"alpha.server.host=0.0.0.0",
"alpha.server.port=8092",
- "alpha.event.pollingInterval=1"
+ "alpha.event.pollingInterval=1",
+ "alpha.mode=SAGA"
})
public class AlphaIntegrationWithSSLTest extends AlphaIntegrationTest {
private static final int port = 8092;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
index d421075..102141e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -18,6 +18,11 @@
package org.apache.servicecomb.saga.alpha.tcc.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
@@ -25,10 +30,17 @@ import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
-import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.TransactionEventRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
@@ -45,20 +57,19 @@ import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = {AlphaApplication.class},
properties = {
"alpha.server.host=0.0.0.0",
- "alpha.server.tcc-port=8090",
- "alpha.event.pollingInterval=1",
+ "alpha.server.tcc-port=8190",
"alpha.mode=TCC"
})
public class AlphaTccServerTest {
- private static final int port = 8090;
+ private static final int port = 8190;
protected static ManagedChannel clientChannel;
private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel);
private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel);
- private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
+ private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
private final TccCoordinateCommandStreamObserver commandStreamObserver =
new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands);
@@ -66,7 +77,9 @@ public class AlphaTccServerTest {
private final String globalTxId = UUID.randomUUID().toString();
private final String localTxId = UUID.randomUUID().toString();
private final String parentTxId = UUID.randomUUID().toString();
- private final String compensationMethod = getClass().getCanonicalName();
+ private final String confirmMethod = "confirm";
+ private final String cancelMethod = "cancel";
+
private final String serviceName = uniquify("serviceName");
private final String instanceId = uniquify("instanceId");
@@ -94,12 +107,95 @@ public class AlphaTccServerTest {
@After
public void after() {
-// blockingStub.onDisconnected(serviceConfig);
+ blockingStub.onDisconnected(serviceConfig);
}
@Test
public void assertOnConnect() {
asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ assertThat(
+ OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class))
+ );
+ }
+
+ private void awaitUntilConnected() {
+ await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName)));
+ }
+
+ @Test
+ public void assertOnParticipated() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ assertThat(TransactionEventRegistry.retrieve(globalTxId).size(), is(1));
+ ParticipatedEvent event = TransactionEventRegistry.retrieve(globalTxId).iterator().next();
+ assertThat(event.getGlobalTxId(), is(globalTxId));
+ assertThat(event.getLocalTxId(), is(localTxId));
+ assertThat(event.getInstanceId(), is(instanceId));
+ assertThat(event.getServiceName(), is(serviceName));
+ assertThat(event.getConfirmMethod(), is(confirmMethod));
+ assertThat(event.getCancelMethod(), is(cancelMethod));
+ assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+ }
+
+ @Test
+ public void assertOnTccTransactionSucceedEnded() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+ await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.size(), is(1));
+ GrpcTccCoordinateCommand command = receivedCommands.poll();
+ assertThat(command.getMethod(), is("confirm"));
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getServiceName(), is(serviceName));
+ }
+
+ @Test
+ public void assertOnTccTransactionFailedEnded() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ blockingStub.onTccTransactionEnded(newTxEnd("Failed"));
+
+ await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.size(), is(1));
+ GrpcTccCoordinateCommand command = receivedCommands.poll();
+ assertThat(command.getMethod(), is("cancel"));
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getServiceName(), is(serviceName));
+ }
+
+ private GrpcTccParticipatedEvent newParticipatedEvent(String status) {
+ return GrpcTccParticipatedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .setCancelMethod(cancelMethod)
+ .setConfirmMethod(confirmMethod)
+ .setStatus(status)
+ .build();
+ }
+
+ private GrpcTccTransactionStartedEvent newTxStart() {
+ return GrpcTccTransactionStartedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .build();
+ }
+
+ private GrpcTccTransactionEndedEvent newTxEnd(String status) {
+ return GrpcTccTransactionEndedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .setStatus(status)
+ .build();
}
private GrpcAck onCompensation(GrpcTccCoordinateCommand command) {
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
similarity index 85%
rename from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
rename to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
index cc39a8c..f97c5d8 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.saga.alpha.tcc.server.common;
+package org.apache.servicecomb.saga.alpha.tcc.server;
import io.grpc.stub.StreamObserver;
import java.util.Queue;
@@ -24,14 +24,14 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {
- private static Queue<GrpcTccCoordinateCommand> receivedCommands;
- private Consumer<GrpcTccCoordinateCommand> consumer;
+ private Queue<GrpcTccCoordinateCommand> receivedCommands;
+ private Consumer<GrpcTccCoordinateCommand> consumer;
private boolean completed = false;
public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
Queue<GrpcTccCoordinateCommand> receivedCommands) {
this.consumer = consumer;
- TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands;
+ this.receivedCommands = receivedCommands;
}
@Override
@@ -42,7 +42,6 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc
@Override
public void onError(Throwable t) {
-
}
@Override