You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:45 UTC
[incubator-servicecomb-saga] 03/07: SCB-138 add test for grpc omega
callback
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 06a1d5d17c486acc21412520a835d0fd396d0f50
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 00:36:29 2018 +0800
SCB-138 add test for grpc omega callback
---
.../saga/alpha/server/GrpcOmegaCallback.java | 9 +-
.../saga/alpha/server/GrpcStartable.java | 2 +-
.../alpha/server/GrpcTxEventStreamObserver.java | 7 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 172 ++++++++-------------
.../grpc/GrpcCompensateStreamObserver.java | 2 +-
.../TransactionHandlerInterceptor.java | 2 +-
6 files changed, 80 insertions(+), 114 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 8577726..43d4ac4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -24,12 +24,19 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
public class GrpcOmegaCallback implements OmegaCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private final StreamObserver<GrpcCompensateCommand> observer;
public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -41,7 +48,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
- .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
+ .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
.setCompensateMethod(event.compensationMethod())
.setPayloads(ByteString.copyFrom(event.payloads()))
.build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 2eefeb7..869d593 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -33,7 +33,7 @@ import io.grpc.ServerBuilder;
class GrpcStartable implements ServerStartable {
- private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Server server;
GrpcStartable(int port, BindableService... services) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 0bbb52f..16ee788 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -39,7 +39,7 @@ import io.grpc.stub.StreamObserver;
class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
- private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
@@ -58,9 +58,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
public void onNext(GrpcTxEvent message) {
// register a callback on started event
if (message.getType().equals(TxStartedEvent.name())) {
- Map<String, OmegaCallback> callbacks = new ConcurrentHashMap<>();
- callbacks.put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
- omegaCallbacks.put(message.getServiceName(), callbacks);
+ omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
+ .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
}
// store received event
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 863c8af..5eb1d1b 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,35 +20,29 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
import java.util.List;
-import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.servicecomb.saga.alpha.core.EventType;
-import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import com.google.protobuf.ByteString;
@@ -58,7 +52,7 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
public class AlphaIntegrationTest {
private static final int port = 8090;
@@ -79,29 +73,23 @@ public class AlphaIntegrationTest {
@Autowired
private TxEventEnvelopeRepository eventRepo;
- @Autowired
- private List<CompensationContext> compensationContexts;
-
// use an empty response observer as we don't need the response in client side
- private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
- @Override
- public void onNext(GrpcCompensateCommand value) {
- }
+ private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
- @Override
- public void onError(Throwable t) {
- }
-
- @Override
- public void onCompleted() {
- }
- };
+ private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
+ private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
@AfterClass
public static void tearDown() throws Exception {
clientChannel.shutdown();
}
+ @Before
+ public void before() throws Exception {
+ eventRepo.deleteAll();
+ receivedCommands.clear();
+ }
+
@Test
public void persistsEvent() throws Exception {
StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
@@ -124,112 +112,84 @@ public class AlphaIntegrationTest {
@Test
public void doNotCompensateDuplicateTxOnFailure() throws Exception {
// duplicate events with same content but different timestamp
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
- eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
+ StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+ requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+ requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+ requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method a"));
String localTxId1 = UUID.randomUUID().toString();
- eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
- eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
+ String parentTxId1 = UUID.randomUUID().toString();
+ requestObserver.onNext(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+ requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
- StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
- await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
- assertThat(compensationContexts, containsInAnyOrder(
- new CompensationContext(globalTxId, this.localTxId, "method a", "service a".getBytes()),
- new CompensationContext(globalTxId, localTxId1, "method b", "service b".getBytes())
+ await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+ System.out.println(receivedCommands.size());
+ receivedCommands.forEach((command) -> System.out.println(command.getCompensateMethod()));
+ assertThat(receivedCommands, containsInAnyOrder(
+ GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+ .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+ GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
+ .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
));
}
- private GrpcTxEvent someGrpcEvent(EventType type) {
- return GrpcTxEvent.newBuilder()
- .setServiceName(serviceName)
- .setInstanceId(instanceId)
- .setTimestamp(System.currentTimeMillis())
- .setGlobalTxId(this.globalTxId)
- .setLocalTxId(this.localTxId)
- .setParentTxId(this.parentTxId)
- .setType(type.name())
- .setCompensationMethod(getClass().getCanonicalName())
- .setPayloads(ByteString.copyFrom(payload.getBytes()))
- .build();
+ @Test
+ public void getCompensateCommandOnFailure() {
+ StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+ requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+ await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+ requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+ await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
+ assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
+ assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
+ assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
+ assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
}
- private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads, String compensationMethod) {
- return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+ private GrpcTxEvent someGrpcEvent(EventType type) {
+ return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
}
- private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
- return new TxEventEnvelope(new TxEvent(
- serviceName,
- instanceId,
- new Date(),
- globalTxId,
- localTxId,
- parentTxId,
- eventType.name(),
- compensationMethod,
- payloads));
+ private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
+ return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
}
- @Configuration
- static class OmegaCallbackConfig {
- private final List<CompensationContext> compensationContexts = new ArrayList<>();
-
- @Bean
- List<CompensationContext> compensationContexts() {
- return compensationContexts;
- }
-
- @Bean
- OmegaCallback omegaCallback() {
- return event ->
- compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
- }
+ private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+ return GrpcTxEvent.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .setTimestamp(System.currentTimeMillis())
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .setParentTxId(parentTxId == null ? "" : parentTxId)
+ .setType(eventType.name())
+ .setCompensationMethod(compensationMethod)
+ .setPayloads(ByteString.copyFrom(payloads))
+ .build();
}
- private static class CompensationContext {
- private final String globalTxId;
- private final String localTxId;
- private final String compensationMethod;
- private final byte[] message;
-
- private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
- this.globalTxId = globalTxId;
- this.localTxId = localTxId;
- this.compensationMethod = compensationMethod;
- this.message = message;
+ private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+ @Override
+ public void onNext(GrpcCompensateCommand command) {
}
@Override
- public String toString() {
- return "CompensationContext{" +
- "globalTxId='" + globalTxId + '\'' +
- ", localTxId='" + localTxId + '\'' +
- ", compensationMethod='" + compensationMethod + '\'' +
- ", message=" + Arrays.toString(message) +
- '}';
+ public void onError(Throwable t) {
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CompensationContext that = (CompensationContext) o;
- return Objects.equals(globalTxId, that.globalTxId) &&
- Objects.equals(localTxId, that.localTxId) &&
- Objects.equals(compensationMethod, that.compensationMethod) &&
- Arrays.equals(message, that.message);
+ public void onCompleted() {
}
+ }
+ private static class CompensateStreamObserver extends EmptyStreamObserver {
@Override
- public int hashCode() {
- return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+ public void onNext(GrpcCompensateCommand command) {
+ // intercept received command
+ receivedCommands.add(command);
}
}
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 01fdc35..d5c757d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -31,7 +31,7 @@ import io.grpc.stub.StreamObserver;
public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
- private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MessageHandler messageHandler;
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index d83786e..3e5d620 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -33,7 +33,7 @@ import org.springframework.web.servlet.ModelAndView;
class TransactionHandlerInterceptor implements HandlerInterceptor {
- private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext omegaContext;
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.