You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:45 UTC

[incubator-servicecomb-saga] 03/07: SCB-138 add test for grpc omega callback

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 06a1d5d17c486acc21412520a835d0fd396d0f50
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 00:36:29 2018 +0800

    SCB-138 add test for grpc omega callback
---
 .../saga/alpha/server/GrpcOmegaCallback.java       |   9 +-
 .../saga/alpha/server/GrpcStartable.java           |   2 +-
 .../alpha/server/GrpcTxEventStreamObserver.java    |   7 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 172 ++++++++-------------
 .../grpc/GrpcCompensateStreamObserver.java         |   2 +-
 .../TransactionHandlerInterceptor.java             |   2 +-
 6 files changed, 80 insertions(+), 114 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 8577726..43d4ac4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -24,12 +24,19 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.protobuf.ByteString;
 
 import io.grpc.stub.StreamObserver;
 
 public class GrpcOmegaCallback implements OmegaCallback {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final StreamObserver<GrpcCompensateCommand> observer;
 
   public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -41,7 +48,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
-        .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
         .setCompensateMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 2eefeb7..869d593 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -33,7 +33,7 @@ import io.grpc.ServerBuilder;
 
 class GrpcStartable implements ServerStartable {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Server server;
 
   GrpcStartable(int port, BindableService... services) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 0bbb52f..16ee788 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -39,7 +39,7 @@ import io.grpc.stub.StreamObserver;
 
 class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
@@ -58,9 +58,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   public void onNext(GrpcTxEvent message) {
     // register a callback on started event
     if (message.getType().equals(TxStartedEvent.name())) {
-      Map<String, OmegaCallback> callbacks = new ConcurrentHashMap<>();
-      callbacks.put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
-      omegaCallbacks.put(message.getServiceName(), callbacks);
+      omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
+          .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
     }
 
     // store received event
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 863c8af..5eb1d1b 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,35 +20,29 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
-import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
-import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -58,7 +52,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -79,29 +73,23 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
-  @Autowired
-  private List<CompensationContext> compensationContexts;
-
   // use an empty response observer as we don't need the response in client side
-  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
-    @Override
-    public void onNext(GrpcCompensateCommand value) {
-    }
+  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
 
-    @Override
-    public void onError(Throwable t) {
-    }
-
-    @Override
-    public void onCompleted() {
-    }
-  };
+  private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
+  private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
 
   @AfterClass
   public static void tearDown() throws Exception {
     clientChannel.shutdown();
   }
 
+  @Before
+  public void before() throws Exception {
+    eventRepo.deleteAll();
+    receivedCommands.clear();
+  }
+
   @Test
   public void persistsEvent() throws Exception {
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
@@ -124,112 +112,84 @@ public class AlphaIntegrationTest {
   @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
+    String parentTxId1 = UUID.randomUUID().toString();
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
 
-    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, this.localTxId, "method a", "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId1, "method b", "service b".getBytes())
+    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+    System.out.println(receivedCommands.size());
+    receivedCommands.forEach((command) -> System.out.println(command.getCompensateMethod()));
+    assertThat(receivedCommands, containsInAnyOrder(
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
+            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
     ));
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type) {
-    return GrpcTxEvent.newBuilder()
-        .setServiceName(serviceName)
-        .setInstanceId(instanceId)
-        .setTimestamp(System.currentTimeMillis())
-        .setGlobalTxId(this.globalTxId)
-        .setLocalTxId(this.localTxId)
-        .setParentTxId(this.parentTxId)
-        .setType(type.name())
-        .setCompensationMethod(getClass().getCanonicalName())
-        .setPayloads(ByteString.copyFrom(payload.getBytes()))
-        .build();
+  @Test
+  public void getCompensateCommandOnFailure() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
+    assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
+    assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
+    assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
+    assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads, String compensationMethod) {
-    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+  private GrpcTxEvent someGrpcEvent(EventType type) {
+    return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
-    return new TxEventEnvelope(new TxEvent(
-        serviceName,
-        instanceId,
-        new Date(),
-        globalTxId,
-        localTxId,
-        parentTxId,
-        eventType.name(),
-        compensationMethod,
-        payloads));
+  private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
+    return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
   }
 
-  @Configuration
-  static class OmegaCallbackConfig {
-    private final List<CompensationContext> compensationContexts = new ArrayList<>();
-
-    @Bean
-    List<CompensationContext> compensationContexts() {
-      return compensationContexts;
-    }
-
-    @Bean
-    OmegaCallback omegaCallback() {
-      return event ->
-          compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
-    }
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setParentTxId(parentTxId == null ? "" : parentTxId)
+        .setType(eventType.name())
+        .setCompensationMethod(compensationMethod)
+        .setPayloads(ByteString.copyFrom(payloads))
+        .build();
   }
 
-  private static class CompensationContext {
-    private final String globalTxId;
-    private final String localTxId;
-    private final String compensationMethod;
-    private final byte[] message;
-
-    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
-      this.globalTxId = globalTxId;
-      this.localTxId = localTxId;
-      this.compensationMethod = compensationMethod;
-      this.message = message;
+  private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    @Override
+    public void onNext(GrpcCompensateCommand command) {
     }
 
     @Override
-    public String toString() {
-      return "CompensationContext{" +
-          "globalTxId='" + globalTxId + '\'' +
-          ", localTxId='" + localTxId + '\'' +
-          ", compensationMethod='" + compensationMethod + '\'' +
-          ", message=" + Arrays.toString(message) +
-          '}';
+    public void onError(Throwable t) {
     }
 
     @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      CompensationContext that = (CompensationContext) o;
-      return Objects.equals(globalTxId, that.globalTxId) &&
-          Objects.equals(localTxId, that.localTxId) &&
-          Objects.equals(compensationMethod, that.compensationMethod) &&
-          Arrays.equals(message, that.message);
+    public void onCompleted() {
     }
+  }
 
+  private static class CompensateStreamObserver extends EmptyStreamObserver {
     @Override
-    public int hashCode() {
-      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      receivedCommands.add(command);
     }
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 01fdc35..d5c757d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -31,7 +31,7 @@ import io.grpc.stub.StreamObserver;
 
 public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MessageHandler messageHandler;
 
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index d83786e..3e5d620 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -33,7 +33,7 @@ import org.springframework.web.servlet.ModelAndView;
 
 class TransactionHandlerInterceptor implements HandlerInterceptor {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext omegaContext;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.