You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:42 UTC

[incubator-servicecomb-saga] branch master updated (9e9fb00 -> b862131)

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git.


    from 9e9fb00  SCB-158 update issue address
     new 31e91ba  SCB-138 set up bidirectional streaming for alpha
     new 9b21624  SCB-138 set up bidirectional streaming for omega
     new 06a1d5d  SCB-138 add test for grpc omega callback
     new 6be4265  SCB-138 remove omegaCallback when grpc client disconnects
     new 26ef883  SCB-138 remove useless code
     new 78b1dd7  SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service
     new b862131  SCB-138 observer maintains its own services instead of using an extra map

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-server/pom.xml                         |  12 ++
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  37 ++--
 .../saga/alpha/server/GrpcOmegaCallback.java       |  54 ++++++
 .../saga/alpha/server/GrpcStartable.java           |   2 +-
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  33 ++--
 .../alpha/server/GrpcTxEventStreamObserver.java    | 116 +++++++++++++
 .../saga/alpha/server/AlphaIntegrationTest.java    | 187 +++++++++------------
 .../server/GrpcTxEventStreamObserverTest.java      | 120 +++++++++++++
 .../connector/grpc/GrpcClientMessageSender.java    |  27 +--
 .../grpc/GrpcCompensateStreamObserver.java         |  62 +++++++
 .../saga/omega/spring/OmegaSpringConfig.java       |  10 +-
 .../TransactionHandlerInterceptor.java             |   2 +-
 .../src/main/proto/GrpcTxEvent.proto               |  10 +-
 13 files changed, 508 insertions(+), 164 deletions(-)
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
 create mode 100644 alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
 create mode 100644 alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
 create mode 100644 omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].

[incubator-servicecomb-saga] 02/07: SCB-138 set up bidirectional streaming for omega

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9b21624c2358e7767a64cf9e60e15b4cfd1ebb72
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 2 21:17:05 2018 +0800

    SCB-138 set up bidirectional streaming for omega
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/server/GrpcOmegaCallback.java       |  1 +
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  2 +-
 ...bserver.java => GrpcTxEventStreamObserver.java} |  4 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 49 +++++------------
 .../connector/grpc/GrpcClientMessageSender.java    | 27 ++++++----
 .../grpc/GrpcCompensateStreamObserver.java         | 62 ++++++++++++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 ++--
 .../src/main/proto/GrpcTxEvent.proto               |  8 +--
 8 files changed, 104 insertions(+), 59 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index db04253..8577726 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -41,6 +41,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
         .setCompensateMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index b68972e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -44,6 +44,6 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcOmegaStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
similarity index 95%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index d8d10f5..0bbb52f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
 
-class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
+class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -47,7 +47,7 @@ class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
-  GrpcOmegaStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+  GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
     this.txConsistentService = txConsistentService;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 20921a8..863c8af 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,8 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
@@ -28,13 +30,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -49,24 +55,7 @@ import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-=======
 import io.grpc.stub.StreamObserver;
-import io.servicecomb.saga.alpha.core.EventType;
-import io.servicecomb.saga.alpha.core.OmegaCallback;
-import io.servicecomb.saga.alpha.core.TxConsistentService;
-import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import io.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
@@ -93,13 +82,7 @@ public class AlphaIntegrationTest {
   @Autowired
   private List<CompensationContext> compensationContexts;
 
-  @Autowired
-  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-
-  @Autowired
-  private TxConsistentService txConsistentService;
-
-  // use an empty response observer as we don't need the response
+  // use an empty response observer as we don't need the response in client side
   private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
     @Override
     public void onNext(GrpcCompensateCommand value) {
@@ -121,14 +104,10 @@ public class AlphaIntegrationTest {
 
   @Test
   public void persistsEvent() throws Exception {
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-    stub.reportEvent(someGrpcEvent(EventType.TxStartedEvent));
-=======
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -137,7 +116,7 @@ public class AlphaIntegrationTest {
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
-    assertThat(envelope.type(), Is.is(EventType.TxStartedEvent.name()));
+    assertThat(envelope.type(), Is.is(TxStartedEvent.name()));
     assertThat(envelope.compensationMethod(), is(compensationMethod));
     assertThat(envelope.payloads(), is(payload.getBytes()));
   }
@@ -145,20 +124,16 @@ public class AlphaIntegrationTest {
   @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
     eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
 
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-    stub.reportEvent(someGrpcEvent(EventType.TxAbortedEvent));
-=======
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
     assertThat(compensationContexts, containsInAnyOrder(
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 09cbaad..482e03a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,36 +20,41 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
-
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
 
 public class GrpcClientMessageSender implements MessageSender {
 
-  private final TxEventServiceBlockingStub eventService;
+  private final TxEventServiceStub eventService;
 
   private final MessageSerializer serializer;
   private final ServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
-    this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
+  private StreamObserver<GrpcTxEvent> requestObserver;
+
+  public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+      MessageHandler handler) {
+    this.eventService = TxEventServiceGrpc.newStub(channel);
     this.serializer = serializer;
     this.serviceConfig = serviceConfig;
+    this.requestObserver = this.eventService.callbackCommand(new GrpcCompensateStreamObserver(handler));
   }
 
   @Override
   public void send(TxEvent event) {
-    eventService.reportEvent(convertEvent(event));
+    requestObserver.onNext(convertEvent(event));
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
new file mode 100644
index 0000000..01fdc35
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+
+  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final MessageHandler messageHandler;
+
+  public GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+    this.messageHandler = messageHandler;
+  }
+
+  @Override
+  public void onNext(GrpcCompensateCommand command) {
+    LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+    messageHandler.onReceive(
+        command.getGlobalTxId(),
+        command.getLocalTxId(),
+        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+        command.getCompensateMethod(),
+        command.getPayloads());
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("failed to process grpc compensate command.", t);
+  }
+
+  @Override
+  public void onCompleted() {
+  }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 4a454fd..d00f1cc 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -27,20 +27,21 @@ import javax.annotation.PreDestroy;
 import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,11 +69,12 @@ class OmegaSpringConfig {
   }
 
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
+  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig,
+      @Lazy MessageHandler handler) {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 75a5664..5bc5836 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,8 +22,7 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {
-  }
+  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {}
 }
 
 message GrpcTxEvent {
@@ -41,6 +40,7 @@ message GrpcTxEvent {
 message GrpcCompensateCommand {
   string globalTxId = 1;
   string localTxId = 2;
-  string compensateMethod = 3;
-  bytes payloads = 4;
+  string parentTxId = 3;
+  string compensateMethod = 4;
+  bytes payloads = 5;
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 03/07: SCB-138 add test for grpc omega callback

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 06a1d5d17c486acc21412520a835d0fd396d0f50
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 00:36:29 2018 +0800

    SCB-138 add test for grpc omega callback
---
 .../saga/alpha/server/GrpcOmegaCallback.java       |   9 +-
 .../saga/alpha/server/GrpcStartable.java           |   2 +-
 .../alpha/server/GrpcTxEventStreamObserver.java    |   7 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 172 ++++++++-------------
 .../grpc/GrpcCompensateStreamObserver.java         |   2 +-
 .../TransactionHandlerInterceptor.java             |   2 +-
 6 files changed, 80 insertions(+), 114 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 8577726..43d4ac4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -24,12 +24,19 @@ import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.protobuf.ByteString;
 
 import io.grpc.stub.StreamObserver;
 
 public class GrpcOmegaCallback implements OmegaCallback {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final StreamObserver<GrpcCompensateCommand> observer;
 
   public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -41,7 +48,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
-        .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
         .setCompensateMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 2eefeb7..869d593 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -33,7 +33,7 @@ import io.grpc.ServerBuilder;
 
 class GrpcStartable implements ServerStartable {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Server server;
 
   GrpcStartable(int port, BindableService... services) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 0bbb52f..16ee788 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -39,7 +39,7 @@ import io.grpc.stub.StreamObserver;
 
 class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
@@ -58,9 +58,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   public void onNext(GrpcTxEvent message) {
     // register a callback on started event
     if (message.getType().equals(TxStartedEvent.name())) {
-      Map<String, OmegaCallback> callbacks = new ConcurrentHashMap<>();
-      callbacks.put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
-      omegaCallbacks.put(message.getServiceName(), callbacks);
+      omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
+          .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
     }
 
     // store received event
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 863c8af..5eb1d1b 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -20,35 +20,29 @@ package org.apache.servicecomb.saga.alpha.server;
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
-import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
-import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
@@ -58,7 +52,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
@@ -79,29 +73,23 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
-  @Autowired
-  private List<CompensationContext> compensationContexts;
-
   // use an empty response observer as we don't need the response in client side
-  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
-    @Override
-    public void onNext(GrpcCompensateCommand value) {
-    }
+  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
 
-    @Override
-    public void onError(Throwable t) {
-    }
-
-    @Override
-    public void onCompleted() {
-    }
-  };
+  private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
+  private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
 
   @AfterClass
   public static void tearDown() throws Exception {
     clientChannel.shutdown();
   }
 
+  @Before
+  public void before() throws Exception {
+    eventRepo.deleteAll();
+    receivedCommands.clear();
+  }
+
   @Test
   public void persistsEvent() throws Exception {
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
@@ -124,112 +112,84 @@ public class AlphaIntegrationTest {
   @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
+    String parentTxId1 = UUID.randomUUID().toString();
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
 
-    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, this.localTxId, "method a", "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId1, "method b", "service b".getBytes())
+    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+    System.out.println(receivedCommands.size());
+    receivedCommands.forEach((command) -> System.out.println(command.getCompensateMethod()));
+    assertThat(receivedCommands, containsInAnyOrder(
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
+            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
     ));
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type) {
-    return GrpcTxEvent.newBuilder()
-        .setServiceName(serviceName)
-        .setInstanceId(instanceId)
-        .setTimestamp(System.currentTimeMillis())
-        .setGlobalTxId(this.globalTxId)
-        .setLocalTxId(this.localTxId)
-        .setParentTxId(this.parentTxId)
-        .setType(type.name())
-        .setCompensationMethod(getClass().getCanonicalName())
-        .setPayloads(ByteString.copyFrom(payload.getBytes()))
-        .build();
+  @Test
+  public void getCompensateCommandOnFailure() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+    requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+    assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
+    assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
+    assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
+    assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
+    assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads, String compensationMethod) {
-    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+  private GrpcTxEvent someGrpcEvent(EventType type) {
+    return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
-    return new TxEventEnvelope(new TxEvent(
-        serviceName,
-        instanceId,
-        new Date(),
-        globalTxId,
-        localTxId,
-        parentTxId,
-        eventType.name(),
-        compensationMethod,
-        payloads));
+  private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
+    return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
   }
 
-  @Configuration
-  static class OmegaCallbackConfig {
-    private final List<CompensationContext> compensationContexts = new ArrayList<>();
-
-    @Bean
-    List<CompensationContext> compensationContexts() {
-      return compensationContexts;
-    }
-
-    @Bean
-    OmegaCallback omegaCallback() {
-      return event ->
-          compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
-    }
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setParentTxId(parentTxId == null ? "" : parentTxId)
+        .setType(eventType.name())
+        .setCompensationMethod(compensationMethod)
+        .setPayloads(ByteString.copyFrom(payloads))
+        .build();
   }
 
-  private static class CompensationContext {
-    private final String globalTxId;
-    private final String localTxId;
-    private final String compensationMethod;
-    private final byte[] message;
-
-    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
-      this.globalTxId = globalTxId;
-      this.localTxId = localTxId;
-      this.compensationMethod = compensationMethod;
-      this.message = message;
+  private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+    @Override
+    public void onNext(GrpcCompensateCommand command) {
     }
 
     @Override
-    public String toString() {
-      return "CompensationContext{" +
-          "globalTxId='" + globalTxId + '\'' +
-          ", localTxId='" + localTxId + '\'' +
-          ", compensationMethod='" + compensationMethod + '\'' +
-          ", message=" + Arrays.toString(message) +
-          '}';
+    public void onError(Throwable t) {
     }
 
     @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      CompensationContext that = (CompensationContext) o;
-      return Objects.equals(globalTxId, that.globalTxId) &&
-          Objects.equals(localTxId, that.localTxId) &&
-          Objects.equals(compensationMethod, that.compensationMethod) &&
-          Arrays.equals(message, that.message);
+    public void onCompleted() {
     }
+  }
 
+  private static class CompensateStreamObserver extends EmptyStreamObserver {
     @Override
-    public int hashCode() {
-      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      receivedCommands.add(command);
     }
   }
 }
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index 01fdc35..d5c757d 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -31,7 +31,7 @@ import io.grpc.stub.StreamObserver;
 
 public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MessageHandler messageHandler;
 
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index d83786e..3e5d620 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -33,7 +33,7 @@ import org.springframework.web.servlet.ModelAndView;
 
 class TransactionHandlerInterceptor implements HandlerInterceptor {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext omegaContext;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 05/07: SCB-138 remove useless code

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 26ef883b552102d5a8545079929c2affe4d6b8db
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 22:10:08 2018 +0800

    SCB-138 remove useless code
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java     | 5 ++++-
 .../apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java   | 2 --
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 6a2ae8e..623355c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -93,11 +93,14 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   @Override
   public void onError(Throwable t) {
     LOG.error("failed to process grpc message.", t);
-    onCompleted();
+    responseObserver.onCompleted();
+    removeInvalidCallback();
   }
 
+  // unless we shutdown the alpha server gracefully, this method should never be called
   @Override
   public void onCompleted() {
+    LOG.info("disconnect the grpc client");
     responseObserver.onCompleted();
     removeInvalidCallback();
   }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index ca4b7bd..55c095e 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -124,8 +124,6 @@ public class AlphaIntegrationTest {
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
 
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
-    System.out.println(receivedCommands.size());
-    receivedCommands.forEach((command) -> System.out.println(command.getCompensateMethod()));
     assertThat(receivedCommands, containsInAnyOrder(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
             .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 04/07: SCB-138 remove omegaCallback when grpc client disconnects

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 6be42657232e0b58c7325b392323b7ebd8487b33
Author: Eric Lee <da...@huawei.com>
AuthorDate: Wed Jan 3 18:59:13 2018 +0800

    SCB-138 remove omegaCallback when grpc client disconnects
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 alpha/alpha-server/pom.xml                         |  12 ++
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  28 +++--
 .../saga/alpha/server/GrpcOmegaCallback.java       |  26 +++--
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |   8 +-
 .../alpha/server/GrpcTxEventStreamObserver.java    |  45 +++++++-
 .../saga/alpha/server/AlphaIntegrationTest.java    |  25 ++---
 .../server/GrpcTxEventStreamObserverTest.java      | 122 +++++++++++++++++++++
 7 files changed, 227 insertions(+), 39 deletions(-)

diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index cc4115d..7ecd396 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -88,6 +88,18 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e33ba99..7353fa4 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,16 +24,18 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.PostConstruct;
 
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import io.grpc.stub.StreamObserver;
 
 @Configuration
 class AlphaConfig {
@@ -42,13 +44,18 @@ class AlphaConfig {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
-  // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
+  // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
   }
 
   @Bean
+  Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+    return new ConcurrentHashMap<>();
+  }
+
+  @Bean
   OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
     return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
   }
@@ -62,19 +69,22 @@ class AlphaConfig {
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
     new Thread(startable::start).start();
 
     return consistentService;
   }
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    return new GrpcStartable(port, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+    return new GrpcStartable(port,
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index 43d4ac4..c9cda3a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,23 +20,18 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.Objects;
+
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 
-import java.lang.invoke.MethodHandles;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.protobuf.ByteString;
 
 import io.grpc.stub.StreamObserver;
 
 public class GrpcOmegaCallback implements OmegaCallback {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final StreamObserver<GrpcCompensateCommand> observer;
 
   public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
@@ -54,4 +49,21 @@ public class GrpcOmegaCallback implements OmegaCallback {
         .build();
     observer.onNext(command);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    GrpcOmegaCallback that = (GrpcOmegaCallback) o;
+    return Objects.equals(observer, that.observer);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(observer);
+  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 0b84217..d8d1cea 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -36,14 +36,18 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
+    this.omegaCallbacksReverse = omegaCallbacksReverse;
   }
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 16ee788..6a2ae8e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,8 +23,12 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -43,13 +47,17 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
+    this.omegaCallbacksReverse = omegaCallbacksReverse;
     this.txConsistentService = txConsistentService;
     this.responseObserver = responseObserver;
   }
@@ -57,15 +65,21 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   @Override
   public void onNext(GrpcTxEvent message) {
     // register a callback on started event
+    String serviceName = message.getServiceName();
+    String instanceId = message.getInstanceId();
     if (message.getType().equals(TxStartedEvent.name())) {
-      omegaCallbacks.computeIfAbsent(message.getServiceName(), (key) -> new ConcurrentHashMap<>())
-          .put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks
+          .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
+      instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
+      Map<String, String> serviceInstanceId = omegaCallbacksReverse
+          .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
+      serviceInstanceId.putIfAbsent(serviceName, instanceId);
     }
 
     // store received event
     txConsistentService.handle(new TxEvent(
-        message.getServiceName(),
-        message.getInstanceId(),
+        serviceName,
+        instanceId,
         new Date(message.getTimestamp()),
         message.getGlobalTxId(),
         message.getLocalTxId(),
@@ -79,11 +93,32 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   @Override
   public void onError(Throwable t) {
     LOG.error("failed to process grpc message.", t);
-    responseObserver.onCompleted();
+    onCompleted();
   }
 
   @Override
   public void onCompleted() {
     responseObserver.onCompleted();
+    removeInvalidCallback();
+  }
+
+  private void removeInvalidCallback() {
+    Collection<Map<String, String>> services = omegaCallbacksReverse.values();
+    for (Map<String, String> service : services) {
+      Set<String> removedServices = new HashSet<>();
+      for (Entry<String, String> entry : service.entrySet()) {
+        String serviceName = entry.getKey();
+        String instanceId = entry.getValue();
+        Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
+        if (instanceCallback != null) {
+          instanceCallback.remove(instanceId);
+          removedServices.add(serviceName);
+        }
+      }
+      for (String removedService : removedServices) {
+        service.remove(removedService);
+      }
+    }
+    omegaCallbacksReverse.remove(responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 5eb1d1b..ca4b7bd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -73,9 +73,6 @@ public class AlphaIntegrationTest {
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
-  // use an empty response observer as we don't need the response in client side
-  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new EmptyStreamObserver();
-
   private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
   private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
 
@@ -85,18 +82,20 @@ public class AlphaIntegrationTest {
   }
 
   @Before
-  public void before() throws Exception {
+  public void before() {
     eventRepo.deleteAll();
     receivedCommands.clear();
   }
 
   @Test
-  public void persistsEvent() throws Exception {
-    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
+  public void persistsEvent() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
 
+    assertThat(receivedCommands.size(), is(0));
+
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
     assertThat(envelope.serviceName(), is(serviceName));
@@ -110,7 +109,7 @@ public class AlphaIntegrationTest {
   }
 
   @Test
-  public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+  public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
@@ -171,9 +170,11 @@ public class AlphaIntegrationTest {
         .build();
   }
 
-  private static class EmptyStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+  private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
     @Override
     public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      receivedCommands.add(command);
     }
 
     @Override
@@ -184,12 +185,4 @@ public class AlphaIntegrationTest {
     public void onCompleted() {
     }
   }
-
-  private static class CompensateStreamObserver extends EmptyStreamObserver {
-    @Override
-    public void onNext(GrpcCompensateCommand command) {
-      // intercept received command
-      receivedCommands.add(command);
-    }
-  }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
new file mode 100644
index 0000000..a40498e
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcTxEventStreamObserverTest {
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
+
+  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+
+  private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
+
+  private final StreamObserver responseObserver = mock(StreamObserver.class);
+
+  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
+      omegaCallbacksReverse, txConsistentService, responseObserver);
+
+  private final String serviceName = "service a";
+
+  private final String instanceId = "instance a";
+
+  private final GrpcTxEvent startedEvent = eventOf(serviceName, instanceId, TxStartedEvent);
+
+  private final GrpcTxEvent abortedEvent = eventOf(serviceName, instanceId, TxAbortedEvent);
+
+  private final GrpcTxEvent endedEvent = eventOf(serviceName, instanceId, TxEndedEvent);
+
+  @Before
+  public void setUp() throws Exception {
+    omegaCallbacks.clear();
+    omegaCallbacksReverse.clear();
+  }
+
+  @Test
+  public void updateOmegaCallbacksOnStartedEvent() {
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
+    assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
+        is(new GrpcOmegaCallback(responseObserver)));
+
+    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+  }
+
+  @Test
+  public void duplicateEventsOnlyHoldsOneOmegaCallback() {
+    observer.onNext(startedEvent);
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(omegaCallbacksReverse.size(), is(1));
+  }
+
+  @Test
+  public void omegaCallbacksNotChangeOnOtherEvents() {
+    observer.onNext(abortedEvent);
+    observer.onNext(endedEvent);
+
+    assertThat(omegaCallbacks.isEmpty(), is(true));
+  }
+
+  @Test
+  public void removeOmegaCallbacksOnComplete() {
+    observer.onNext(startedEvent);
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
+    assertThat(omegaCallbacksReverse.size(), is(1));
+
+    observer.onCompleted();
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
+    assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+  }
+
+  private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setType(type.name())
+        .build();
+  }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 07/07: SCB-138 observer maintains its own services instead of using an extra map

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit b862131fee700572daa8a5c99af9dafe2ba42749
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 15:37:30 2018 +0800

    SCB-138 observer maintains its own services instead of using an extra map
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 20 ++++----------------
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  9 ++-------
 .../alpha/server/GrpcTxEventStreamObserver.java    | 22 ++++++++++++----------
 .../server/GrpcTxEventStreamObserverTest.java      | 21 ++++++++-------------
 4 files changed, 26 insertions(+), 46 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index c8eabdc..f970ecb 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,7 +17,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,13 +30,10 @@ import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import io.grpc.stub.StreamObserver;
-
 @Configuration
 class AlphaConfig {
   private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
@@ -45,18 +41,12 @@ class AlphaConfig {
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
-  // TODO: 2018/01/03 optimize reverse visit of the map instead of using another map, namely omegaCallbacksReverse
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
   }
 
   @Bean
-  Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
-    return new ConcurrentHashMap<>();
-  }
-
-  @Bean
   OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
     return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
   }
@@ -70,22 +60,20 @@ class AlphaConfig {
   TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks, omegaCallbacksReverse);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
 
     return consistentService;
   }
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     return new GrpcStartable(port,
-        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 27c524e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,7 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -37,18 +36,14 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
-
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
-      Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
-    this.omegaCallbacksReverse = omegaCallbacksReverse;
   }
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcTxEventStreamObserver(omegaCallbacks, omegaCallbacksReverse, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 07fe093..108df14 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -24,9 +24,9 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
 
 class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
@@ -45,17 +46,15 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
+  private final Set<SimpleImmutableEntry<String, String>> serviceEntries = new ConcurrentSet<>();
 
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
-    this.omegaCallbacksReverse = omegaCallbacksReverse;
     this.txConsistentService = txConsistentService;
     this.responseObserver = responseObserver;
   }
@@ -69,7 +68,7 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
       Map<String, OmegaCallback> instanceCallback = omegaCallbacks
           .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
       instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
-      omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
+      serviceEntries.add(new SimpleImmutableEntry<>(serviceName, instanceId));
     }
 
     // store received event
@@ -102,13 +101,16 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   }
 
   private void removeInvalidCallback() {
-    Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
-    for (SimpleImmutableEntry<String, String> pair : services) {
-      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+    for (SimpleImmutableEntry<String, String> entry : serviceEntries) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(entry.getKey());
       if (instanceCallback != null) {
-        instanceCallback.remove(pair.getValue());
+        instanceCallback.remove(entry.getValue());
       }
     }
-    omegaCallbacksReverse.remove(responseObserver);
+    serviceEntries.clear();
+  }
+
+  Set<SimpleImmutableEntry<String, String>> serviceEntries() {
+    return serviceEntries;
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index 2fb3593..6270f42 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -25,6 +25,7 @@ import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,14 +46,12 @@ import io.grpc.stub.StreamObserver;
 public class GrpcTxEventStreamObserverTest {
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
-
   private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
 
   private final StreamObserver responseObserver = mock(StreamObserver.class);
 
-  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks,
-      omegaCallbacksReverse, txConsistentService, responseObserver);
+  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService,
+      responseObserver);
 
   private final String serviceName = "service a";
 
@@ -68,7 +66,6 @@ public class GrpcTxEventStreamObserverTest {
   @Before
   public void setUp() throws Exception {
     omegaCallbacks.clear();
-    omegaCallbacksReverse.clear();
   }
 
   @Test
@@ -81,10 +78,8 @@ public class GrpcTxEventStreamObserverTest {
     assertThat(callback, is(notNullValue()));
     assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
 
-    assertThat(omegaCallbacksReverse.size(), is(1));
-    assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
+    assertThat(observer.serviceEntries().size(), is(1));
+    assertThat(observer.serviceEntries(), hasItem(new SimpleImmutableEntry<>(serviceName, instanceId)));
   }
 
   @Test
@@ -93,7 +88,7 @@ public class GrpcTxEventStreamObserverTest {
     observer.onNext(startedEvent);
 
     assertThat(omegaCallbacks.size(), is(1));
-    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(observer.serviceEntries().size(), is(1));
   }
 
   @Test
@@ -108,11 +103,11 @@ public class GrpcTxEventStreamObserverTest {
   public void removeOmegaCallbacksOnComplete() {
     observer.onNext(startedEvent);
     assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
-    assertThat(omegaCallbacksReverse.size(), is(1));
+    assertThat(observer.serviceEntries().size(), is(1));
 
     observer.onCompleted();
     assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
-    assertThat(omegaCallbacksReverse.isEmpty(), is(true));
+    assertThat(observer.serviceEntries().isEmpty(), is(true));
   }
 
   private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 06/07: SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 78b1dd73cf5be288b74cf492a37e3fd3e9b658a0
Author: Eric Lee <da...@huawei.com>
AuthorDate: Thu Jan 4 11:34:38 2018 +0800

    SCB-138 use SimpleImmutableEntry instead of map to record relationship of response observer and service
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  7 +++--
 .../saga/alpha/server/GrpcOmegaCallback.java       | 23 +++------------
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  5 ++--
 .../alpha/server/GrpcTxEventStreamObserver.java    | 33 +++++++---------------
 .../saga/alpha/server/AlphaIntegrationTest.java    |  6 ++--
 .../server/GrpcTxEventStreamObserverTest.java      | 11 +++++---
 .../connector/grpc/GrpcClientMessageSender.java    |  2 +-
 .../grpc/GrpcCompensateStreamObserver.java         |  2 +-
 8 files changed, 34 insertions(+), 55 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 7353fa4..c8eabdc 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -51,7 +52,7 @@ class AlphaConfig {
   }
 
   @Bean
-  Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse() {
+  Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse() {
     return new ConcurrentHashMap<>();
   }
 
@@ -70,7 +71,7 @@ class AlphaConfig {
       TxEventRepository eventRepository,
       OmegaCallback omegaCallback,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
 
     TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
@@ -82,7 +83,7 @@ class AlphaConfig {
 
   private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
     return new GrpcStartable(port,
         new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, omegaCallbacksReverse));
   }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index c9cda3a..c576552 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -20,8 +20,6 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.Objects;
-
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
@@ -38,6 +36,10 @@ public class GrpcOmegaCallback implements OmegaCallback {
     this.observer = observer;
   }
 
+  StreamObserver<GrpcCompensateCommand> observer() {
+    return observer;
+  }
+
   @Override
   public void compensate(TxEvent event) {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
@@ -49,21 +51,4 @@ public class GrpcOmegaCallback implements OmegaCallback {
         .build();
     observer.onNext(command);
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    GrpcOmegaCallback that = (GrpcOmegaCallback) o;
-    return Objects.equals(observer, that.observer);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(observer);
-  }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index d8d1cea..27c524e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,6 +20,7 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -36,11 +37,11 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
 
   GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse) {
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse) {
     this.txConsistentService = txConsistentService;
     this.omegaCallbacks = omegaCallbacks;
     this.omegaCallbacksReverse = omegaCallbacksReverse;
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index 623355c..07fe093 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -23,12 +23,10 @@ package org.apache.servicecomb.saga.alpha.server;
 import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 
 import java.lang.invoke.MethodHandles;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Collection;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -47,14 +45,14 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse;
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse;
 
   private final TxConsistentService txConsistentService;
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
   GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
-      Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse,
+      Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
     this.omegaCallbacksReverse = omegaCallbacksReverse;
@@ -70,10 +68,8 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
     if (message.getType().equals(TxStartedEvent.name())) {
       Map<String, OmegaCallback> instanceCallback = omegaCallbacks
           .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
-      instanceCallback.putIfAbsent(instanceId, new GrpcOmegaCallback(responseObserver));
-      Map<String, String> serviceInstanceId = omegaCallbacksReverse
-          .computeIfAbsent(responseObserver, v -> new ConcurrentHashMap<>());
-      serviceInstanceId.putIfAbsent(serviceName, instanceId);
+      instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
+      omegaCallbacksReverse.computeIfAbsent(responseObserver, v -> new SimpleImmutableEntry<>(serviceName, instanceId));
     }
 
     // store received event
@@ -106,20 +102,11 @@ class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
   }
 
   private void removeInvalidCallback() {
-    Collection<Map<String, String>> services = omegaCallbacksReverse.values();
-    for (Map<String, String> service : services) {
-      Set<String> removedServices = new HashSet<>();
-      for (Entry<String, String> entry : service.entrySet()) {
-        String serviceName = entry.getKey();
-        String instanceId = entry.getValue();
-        Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(serviceName);
-        if (instanceCallback != null) {
-          instanceCallback.remove(instanceId);
-          removedServices.add(serviceName);
-        }
-      }
-      for (String removedService : removedServices) {
-        service.remove(removedService);
+    Collection<SimpleImmutableEntry<String, String>> services = omegaCallbacksReverse.values();
+    for (SimpleImmutableEntry<String, String> pair : services) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(pair.getKey());
+      if (instanceCallback != null) {
+        instanceCallback.remove(pair.getValue());
       }
     }
     omegaCallbacksReverse.remove(responseObserver);
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 55c095e..a406acd 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -94,7 +94,7 @@ public class AlphaIntegrationTest {
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
 
-    assertThat(receivedCommands.size(), is(0));
+    assertThat(receivedCommands.isEmpty(), is(true));
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -122,8 +122,8 @@ public class AlphaIntegrationTest {
     requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
 
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
-
     await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+
     assertThat(receivedCommands, containsInAnyOrder(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
             .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
@@ -137,8 +137,10 @@ public class AlphaIntegrationTest {
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
     assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
     assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
     assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
index a40498e..2fb3593 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +46,7 @@ import io.grpc.stub.StreamObserver;
 public class GrpcTxEventStreamObserverTest {
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
 
-  private final Map<StreamObserver<GrpcCompensateCommand>, Map<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
+  private final Map<StreamObserver<GrpcCompensateCommand>, SimpleImmutableEntry<String, String>> omegaCallbacksReverse = new ConcurrentHashMap<>();
 
   private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
 
@@ -76,12 +77,14 @@ public class GrpcTxEventStreamObserverTest {
 
     assertThat(omegaCallbacks.size(), is(1));
     assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
-    assertThat(omegaCallbacks.get(serviceName).getOrDefault(instanceId, null),
-        is(new GrpcOmegaCallback(responseObserver)));
+    OmegaCallback callback = omegaCallbacks.get(serviceName).getOrDefault(instanceId, null);
+    assertThat(callback, is(notNullValue()));
+    assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
 
     assertThat(omegaCallbacksReverse.size(), is(1));
     assertThat(omegaCallbacksReverse.getOrDefault(responseObserver, null), is(notNullValue()));
-    assertThat(omegaCallbacksReverse.get(responseObserver).getOrDefault(serviceName, null), is(instanceId));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getKey(), is(serviceName));
+    assertThat(omegaCallbacksReverse.get(responseObserver).getValue(), is(instanceId));
   }
 
   @Test
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 482e03a..ac522b0 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -42,7 +42,7 @@ public class GrpcClientMessageSender implements MessageSender {
   private final MessageSerializer serializer;
   private final ServiceConfig serviceConfig;
 
-  private StreamObserver<GrpcTxEvent> requestObserver;
+  private final StreamObserver<GrpcTxEvent> requestObserver;
 
   public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
       MessageHandler handler) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
index d5c757d..1429da6 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -41,7 +41,7 @@ public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensa
 
   @Override
   public void onNext(GrpcCompensateCommand command) {
-    LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
         command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
     messageHandler.onReceive(
         command.getGlobalTxId(),

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.

[incubator-servicecomb-saga] 01/07: SCB-138 set up bidirectional streaming for alpha

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 31e91ba817dca1f7d5569715e46db8fe97b14ccc
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 2 17:49:21 2018 +0800

    SCB-138 set up bidirectional streaming for alpha
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../servicecomb/saga/alpha/server/AlphaConfig.java | 28 +++----
 .../saga/alpha/server/GrpcOmegaCallback.java       | 49 ++++++++++++
 .../saga/alpha/server/GrpcOmegaStreamObserver.java | 90 ++++++++++++++++++++++
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 33 +++-----
 .../saga/alpha/server/AlphaIntegrationTest.java    | 49 +++++++++++-
 .../src/main/proto/GrpcTxEvent.proto               | 10 ++-
 6 files changed, 222 insertions(+), 37 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 7bd855e..e33ba99 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -54,25 +54,27 @@ class AlphaConfig {
   }
   
   @Bean
-  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port,
-      TxEventEnvelopeRepository eventRepo,
-      OmegaCallback omegaCallback) {
+  TxEventRepository springTxEventRepository(TxEventEnvelopeRepository eventRepo) {
+    return new SpringTxEventRepository(eventRepo);
+  }
+
+  @Bean
+  TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
+      TxEventRepository eventRepository,
+      OmegaCallback omegaCallback,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo);
+    TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, omegaCallback, eventRepository);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
 
-    return eventRepository;
+    return consistentService;
   }
 
-  private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) {
-    return new GrpcStartable(
-        port,
-        new GrpcTxEventEndpointImpl(
-            new TxConsistentService(
-                eventRepository,
-                omegaCallback)));
+  private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    return new GrpcStartable(port, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
new file mode 100644
index 0000000..db04253
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcOmegaCallback implements OmegaCallback {
+
+  private final StreamObserver<GrpcCompensateCommand> observer;
+
+  public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+    this.observer = observer;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setCompensateMethod(event.compensationMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
new file mode 100644
index 0000000..d8d10f5
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+
+class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
+
+  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  private final TxConsistentService txConsistentService;
+
+  private final StreamObserver<GrpcCompensateCommand> responseObserver;
+
+  GrpcOmegaStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
+    this.omegaCallbacks = omegaCallbacks;
+    this.txConsistentService = txConsistentService;
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void onNext(GrpcTxEvent message) {
+    // register a callback on started event
+    if (message.getType().equals(TxStartedEvent.name())) {
+      Map<String, OmegaCallback> callbacks = new ConcurrentHashMap<>();
+      callbacks.put(message.getInstanceId(), new GrpcOmegaCallback(responseObserver));
+      omegaCallbacks.put(message.getServiceName(), callbacks);
+    }
+
+    // store received event
+    txConsistentService.handle(new TxEvent(
+        message.getServiceName(),
+        message.getInstanceId(),
+        new Date(message.getTimestamp()),
+        message.getGlobalTxId(),
+        message.getLocalTxId(),
+        message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
+        message.getType(),
+        message.getCompensationMethod(),
+        message.getPayloads().toByteArray()
+    ));
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("failed to process grpc message.", t);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void onCompleted() {
+    responseObserver.onCompleted();
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index cba9f0c..b68972e 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,39 +20,30 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.Date;
+import java.util.Map;
 
-import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcEmpty;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
 
+import io.grpc.stub.StreamObserver;
+
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final TxConsistentService txConsistentService;
 
-  GrpcTxEventEndpointImpl(TxConsistentService txConsistentService) {
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     this.txConsistentService = txConsistentService;
+    this.omegaCallbacks = omegaCallbacks;
   }
 
   @Override
-  public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) {
-    txConsistentService.handle(new TxEvent(
-        message.getServiceName(),
-        message.getInstanceId(),
-        new Date(message.getTimestamp()),
-        message.getGlobalTxId(),
-        message.getLocalTxId(),
-        message.getParentTxId().isEmpty()? null : message.getParentTxId(),
-        message.getType(),
-        message.getCompensationMethod(),
-        message.getPayloads().toByteArray()
-    ));
-
-    GrpcEmpty reply = GrpcEmpty.newBuilder().build();
-    responseObserver.onNext(reply);
-    responseObserver.onCompleted();
+  public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
+    return new GrpcOmegaStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 221c5b0..20921a8 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
@@ -48,11 +49,24 @@ import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
 import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+=======
+import io.grpc.stub.StreamObserver;
+import io.servicecomb.saga.alpha.core.EventType;
+import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.TxConsistentService;
+import io.servicecomb.saga.alpha.core.TxEvent;
+import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
+import io.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+>>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
@@ -62,7 +76,7 @@ public class AlphaIntegrationTest {
   private static ManagedChannel clientChannel = ManagedChannelBuilder
       .forAddress("localhost", port).usePlaintext(true).build();
 
-  private TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+  private TxEventServiceStub stub = TxEventServiceGrpc.newStub(clientChannel);
 
   private static final String payload = "hello world";
 
@@ -79,6 +93,27 @@ public class AlphaIntegrationTest {
   @Autowired
   private List<CompensationContext> compensationContexts;
 
+  @Autowired
+  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  @Autowired
+  private TxConsistentService txConsistentService;
+
+  // use an empty response observer as we don't need the response
+  private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
+    @Override
+    public void onNext(GrpcCompensateCommand value) {
+    }
+
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onCompleted() {
+    }
+  };
+
   @AfterClass
   public static void tearDown() throws Exception {
     clientChannel.shutdown();
@@ -86,7 +121,14 @@ public class AlphaIntegrationTest {
 
   @Test
   public void persistsEvent() throws Exception {
+<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
     stub.reportEvent(someGrpcEvent(EventType.TxStartedEvent));
+=======
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+    // use the asynchronous stub need to wait for some time
+    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+>>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -111,7 +153,12 @@ public class AlphaIntegrationTest {
     eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
     eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
 
+<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
     stub.reportEvent(someGrpcEvent(EventType.TxAbortedEvent));
+=======
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+>>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
     assertThat(compensationContexts, containsInAnyOrder(
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 0679637..75a5664 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,7 +22,8 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc ReportEvent (GrpcTxEvent) returns (GrpcEmpty) {}
+  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {
+  }
 }
 
 message GrpcTxEvent {
@@ -37,4 +38,9 @@ message GrpcTxEvent {
   string instanceId = 9;
 }
 
-message GrpcEmpty {}
+message GrpcCompensateCommand {
+  string globalTxId = 1;
+  string localTxId = 2;
+  string compensateMethod = 3;
+  bytes payloads = 4;
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.