You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/04 08:03:45 UTC

[GitHub] seanyinx closed pull request #98: SCB-138 use grpc bidirectional streaming to connect alpha and omega

seanyinx closed pull request #98: SCB-138 use grpc bidirectional streaming to connect alpha and omega
URL: https://github.com/apache/incubator-servicecomb-saga/pull/98
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index cc4115da..7ecd3966 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -88,6 +88,18 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index 7bd855e7..f970ecbd 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -24,16 +24,15 @@
 
 import javax.annotation.PostConstruct;
 
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
 import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.PendingTaskRunner;
 import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
 import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
 @Configuration
 class AlphaConfig {
@@ -42,7 +41,6 @@
   @Value("${alpha.compensation.retry.delay:3000}")
   private int delay;
 
-  // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138
   @Bean
   Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
     return new ConcurrentHashMap<>();
@@ -54,25 +52,28 @@ OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
   }
   
   @Bean
-  TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") int port,
-      TxEventEnvelopeRepository eventRepo,
-      OmegaCallback omegaCallback) {
+  TxEventRepository springTxEventRepository(TxEventEnvelopeRepository eventRepo) {
+    return new SpringTxEventRepository(eventRepo);
+  }
+
+  @Bean
+  TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int port,
+      TxEventRepository eventRepository,
+      OmegaCallback omegaCallback,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
 
-    TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo);
+    TxConsistentService consistentService = new TxConsistentService(eventRepository, omegaCallback);
 
-    ServerStartable startable = buildGrpc(port, omegaCallback, eventRepository);
+    ServerStartable startable = buildGrpc(port, consistentService, omegaCallbacks);
     new Thread(startable::start).start();
 
-    return eventRepository;
+    return consistentService;
   }
 
-  private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) {
-    return new GrpcStartable(
-        port,
-        new GrpcTxEventEndpointImpl(
-            new TxConsistentService(
-                eventRepository,
-                omegaCallback)));
+  private ServerStartable buildGrpc(int port, TxConsistentService txConsistentService,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
+    return new GrpcStartable(port,
+        new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
   }
 
   @PostConstruct
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
new file mode 100644
index 00000000..c5765529
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcOmegaCallback implements OmegaCallback {
+
+  private final StreamObserver<GrpcCompensateCommand> observer;
+
+  public GrpcOmegaCallback(StreamObserver<GrpcCompensateCommand> observer) {
+    this.observer = observer;
+  }
+
+  StreamObserver<GrpcCompensateCommand> observer() {
+    return observer;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
+        .setGlobalTxId(event.globalTxId())
+        .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
+        .setCompensateMethod(event.compensationMethod())
+        .setPayloads(ByteString.copyFrom(event.payloads()))
+        .build();
+    observer.onNext(command);
+  }
+}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 2eefeb71..869d5937 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -33,7 +33,7 @@
 
 class GrpcStartable implements ServerStartable {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Server server;
 
   GrpcStartable(int port, BindableService... services) {
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index cba9f0c5..0b84217c 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -20,39 +20,30 @@
 
 package org.apache.servicecomb.saga.alpha.server;
 
-import java.util.Date;
+import java.util.Map;
 
-import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcEmpty;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
 
+import io.grpc.stub.StreamObserver;
+
 class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   private final TxConsistentService txConsistentService;
 
-  GrpcTxEventEndpointImpl(TxConsistentService txConsistentService) {
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  GrpcTxEventEndpointImpl(TxConsistentService txConsistentService,
+      Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
     this.txConsistentService = txConsistentService;
+    this.omegaCallbacks = omegaCallbacks;
   }
 
   @Override
-  public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) {
-    txConsistentService.handle(new TxEvent(
-        message.getServiceName(),
-        message.getInstanceId(),
-        new Date(message.getTimestamp()),
-        message.getGlobalTxId(),
-        message.getLocalTxId(),
-        message.getParentTxId().isEmpty()? null : message.getParentTxId(),
-        message.getType(),
-        message.getCompensationMethod(),
-        message.getPayloads().toByteArray()
-    ));
-
-    GrpcEmpty reply = GrpcEmpty.newBuilder().build();
-    responseObserver.onNext(reply);
-    responseObserver.onCompleted();
+  public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
+    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
new file mode 100644
index 00000000..108df14f
--- /dev/null
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+
+import java.lang.invoke.MethodHandles;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
+
+class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
+
+  private final Set<SimpleImmutableEntry<String, String>> serviceEntries = new ConcurrentSet<>();
+
+  private final TxConsistentService txConsistentService;
+
+  private final StreamObserver<GrpcCompensateCommand> responseObserver;
+
+  GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+      TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
+    this.omegaCallbacks = omegaCallbacks;
+    this.txConsistentService = txConsistentService;
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void onNext(GrpcTxEvent message) {
+    // register a callback on started event
+    String serviceName = message.getServiceName();
+    String instanceId = message.getInstanceId();
+    if (message.getType().equals(TxStartedEvent.name())) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks
+          .computeIfAbsent(serviceName, v -> new ConcurrentHashMap<>());
+      instanceCallback.computeIfAbsent(instanceId, v -> new GrpcOmegaCallback(responseObserver));
+      serviceEntries.add(new SimpleImmutableEntry<>(serviceName, instanceId));
+    }
+
+    // store received event
+    txConsistentService.handle(new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(message.getTimestamp()),
+        message.getGlobalTxId(),
+        message.getLocalTxId(),
+        message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
+        message.getType(),
+        message.getCompensationMethod(),
+        message.getPayloads().toByteArray()
+    ));
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("failed to process grpc message.", t);
+    responseObserver.onCompleted();
+    removeInvalidCallback();
+  }
+
+  // unless we shutdown the alpha server gracefully, this method should never be called
+  @Override
+  public void onCompleted() {
+    LOG.info("disconnect the grpc client");
+    responseObserver.onCompleted();
+    removeInvalidCallback();
+  }
+
+  private void removeInvalidCallback() {
+    for (SimpleImmutableEntry<String, String> entry : serviceEntries) {
+      Map<String, OmegaCallback> instanceCallback = omegaCallbacks.get(entry.getKey());
+      if (instanceCallback != null) {
+        instanceCallback.remove(entry.getValue());
+      }
+    }
+    serviceEntries.clear();
+  }
+
+  Set<SimpleImmutableEntry<String, String>> serviceEntries() {
+    return serviceEntries;
+  }
+}
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 221c5b00..a406acdc 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,50 +19,47 @@
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
-import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
-import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
-import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import io.grpc.stub.StreamObserver;
 
 @RunWith(SpringRunner.class)
-@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
+@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class}, properties = "alpha.server.port=8090")
 public class AlphaIntegrationTest {
   private static final int port = 8090;
 
   private static ManagedChannel clientChannel = ManagedChannelBuilder
       .forAddress("localhost", port).usePlaintext(true).build();
 
-  private TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel);
+  private TxEventServiceStub stub = TxEventServiceGrpc.newStub(clientChannel);
 
   private static final String payload = "hello world";
 
@@ -76,17 +73,28 @@
   @Autowired
   private TxEventEnvelopeRepository eventRepo;
 
-  @Autowired
-  private List<CompensationContext> compensationContexts;
+  private static final List<GrpcCompensateCommand> receivedCommands = new CopyOnWriteArrayList<>();
+  private final StreamObserver<GrpcCompensateCommand> compensateResponseObserver = new CompensateStreamObserver();
 
   @AfterClass
   public static void tearDown() throws Exception {
     clientChannel.shutdown();
   }
 
+  @Before
+  public void before() {
+    eventRepo.deleteAll();
+    receivedCommands.clear();
+  }
+
   @Test
-  public void persistsEvent() throws Exception {
-    stub.reportEvent(someGrpcEvent(EventType.TxStartedEvent));
+  public void persistsEvent() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+    // use the asynchronous stub need to wait for some time
+    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+
+    assertThat(receivedCommands.isEmpty(), is(true));
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -95,119 +103,86 @@ public void persistsEvent() throws Exception {
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
-    assertThat(envelope.type(), Is.is(EventType.TxStartedEvent.name()));
+    assertThat(envelope.type(), Is.is(TxStartedEvent.name()));
     assertThat(envelope.compensationMethod(), is(compensationMethod));
     assertThat(envelope.payloads(), is(payload.getBytes()));
   }
 
   @Test
-  public void doNotCompensateDuplicateTxOnFailure() throws Exception {
+  public void doNotCompensateDuplicateTxOnFailure() {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
-
-    stub.reportEvent(someGrpcEvent(EventType.TxAbortedEvent));
-
-    await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
-    assertThat(compensationContexts, containsInAnyOrder(
-        new CompensationContext(globalTxId, this.localTxId, "method a", "service a".getBytes()),
-        new CompensationContext(globalTxId, localTxId1, "method b", "service b".getBytes())
+    String parentTxId1 = UUID.randomUUID().toString();
+    requestObserver.onNext(eventOf(TxStartedEvent, localTxId1, parentTxId1, "service b".getBytes(), "method b"));
+    requestObserver.onNext(eventOf(TxEndedEvent, new byte[0], "method b"));
+
+    requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+
+    assertThat(receivedCommands, containsInAnyOrder(
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId).setParentTxId(parentTxId)
+            .setCompensateMethod("method a").setPayloads(ByteString.copyFrom("service a".getBytes())).build(),
+        GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
+            .setCompensateMethod("method b").setPayloads(ByteString.copyFrom("service b".getBytes())).build()
     ));
   }
 
-  private GrpcTxEvent someGrpcEvent(EventType type) {
-    return GrpcTxEvent.newBuilder()
-        .setServiceName(serviceName)
-        .setInstanceId(instanceId)
-        .setTimestamp(System.currentTimeMillis())
-        .setGlobalTxId(this.globalTxId)
-        .setLocalTxId(this.localTxId)
-        .setParentTxId(this.parentTxId)
-        .setType(type.name())
-        .setCompensationMethod(getClass().getCanonicalName())
-        .setPayloads(ByteString.copyFrom(payload.getBytes()))
-        .build();
+  @Test
+  public void getCompensateCommandOnFailure() {
+    StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(compensateResponseObserver);
+    requestObserver.onNext(someGrpcEvent(TxStartedEvent));
+    await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
+
+    requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
+    await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
+
+    assertThat(receivedCommands.get(0).getGlobalTxId(), is(globalTxId));
+    assertThat(receivedCommands.get(0).getLocalTxId(), is(localTxId));
+    assertThat(receivedCommands.get(0).getParentTxId(), is(parentTxId));
+    assertThat(receivedCommands.get(0).getCompensateMethod(), is(compensationMethod));
+    assertThat(receivedCommands.get(0).getPayloads().toByteArray(), is(payload.getBytes()));
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads, String compensationMethod) {
-    return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
+  private GrpcTxEvent someGrpcEvent(EventType type) {
+    return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName());
   }
 
-  private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
-    return new TxEventEnvelope(new TxEvent(
-        serviceName,
-        instanceId,
-        new Date(),
-        globalTxId,
-        localTxId,
-        parentTxId,
-        eventType.name(),
-        compensationMethod,
-        payloads));
+  private GrpcTxEvent eventOf(EventType eventType, byte[] payloads, String compensationMethod) {
+    return eventOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads, compensationMethod);
   }
 
-  @Configuration
-  static class OmegaCallbackConfig {
-    private final List<CompensationContext> compensationContexts = new ArrayList<>();
-
-    @Bean
-    List<CompensationContext> compensationContexts() {
-      return compensationContexts;
-    }
-
-    @Bean
-    OmegaCallback omegaCallback() {
-      return event ->
-          compensationContexts.add(new CompensationContext(event.globalTxId(), event.localTxId(), event.compensationMethod(), event.payloads()));
-    }
+  private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setTimestamp(System.currentTimeMillis())
+        .setGlobalTxId(globalTxId)
+        .setLocalTxId(localTxId)
+        .setParentTxId(parentTxId == null ? "" : parentTxId)
+        .setType(eventType.name())
+        .setCompensationMethod(compensationMethod)
+        .setPayloads(ByteString.copyFrom(payloads))
+        .build();
   }
 
-  private static class CompensationContext {
-    private final String globalTxId;
-    private final String localTxId;
-    private final String compensationMethod;
-    private final byte[] message;
-
-    private CompensationContext(String globalTxId, String localTxId, String compensationMethod, byte[] message) {
-      this.globalTxId = globalTxId;
-      this.localTxId = localTxId;
-      this.compensationMethod = compensationMethod;
-      this.message = message;
-    }
-
+  private static class CompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
     @Override
-    public String toString() {
-      return "CompensationContext{" +
-          "globalTxId='" + globalTxId + '\'' +
-          ", localTxId='" + localTxId + '\'' +
-          ", compensationMethod='" + compensationMethod + '\'' +
-          ", message=" + Arrays.toString(message) +
-          '}';
+    public void onNext(GrpcCompensateCommand command) {
+      // intercept received command
+      receivedCommands.add(command);
     }
 
     @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      CompensationContext that = (CompensationContext) o;
-      return Objects.equals(globalTxId, that.globalTxId) &&
-          Objects.equals(localTxId, that.localTxId) &&
-          Objects.equals(compensationMethod, that.compensationMethod) &&
-          Arrays.equals(message, that.message);
+    public void onError(Throwable t) {
     }
 
     @Override
-    public int hashCode() {
-      return Objects.hash(globalTxId, localTxId, compensationMethod, message);
+    public void onCompleted() {
     }
   }
 }
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
new file mode 100644
index 00000000..6270f429
--- /dev/null
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserverTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.alpha.server;
+
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxEndedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.servicecomb.saga.alpha.core.EventType;
+import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcTxEventStreamObserverTest {
+  private final Map<String, Map<String, OmegaCallback>> omegaCallbacks = new ConcurrentHashMap<>();
+
+  private final TxConsistentService txConsistentService = mock(TxConsistentService.class);
+
+  private final StreamObserver responseObserver = mock(StreamObserver.class);
+
+  private final GrpcTxEventStreamObserver observer = new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService,
+      responseObserver);
+
+  private final String serviceName = "service a";
+
+  private final String instanceId = "instance a";
+
+  private final GrpcTxEvent startedEvent = eventOf(serviceName, instanceId, TxStartedEvent);
+
+  private final GrpcTxEvent abortedEvent = eventOf(serviceName, instanceId, TxAbortedEvent);
+
+  private final GrpcTxEvent endedEvent = eventOf(serviceName, instanceId, TxEndedEvent);
+
+  @Before
+  public void setUp() throws Exception {
+    omegaCallbacks.clear();
+  }
+
+  @Test
+  public void updateOmegaCallbacksOnStartedEvent() {
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(omegaCallbacks.getOrDefault(serviceName, null), is(notNullValue()));
+    OmegaCallback callback = omegaCallbacks.get(serviceName).getOrDefault(instanceId, null);
+    assertThat(callback, is(notNullValue()));
+    assertThat(((GrpcOmegaCallback) callback).observer(), is(responseObserver));
+
+    assertThat(observer.serviceEntries().size(), is(1));
+    assertThat(observer.serviceEntries(), hasItem(new SimpleImmutableEntry<>(serviceName, instanceId)));
+  }
+
+  @Test
+  public void duplicateEventsOnlyHoldsOneOmegaCallback() {
+    observer.onNext(startedEvent);
+    observer.onNext(startedEvent);
+
+    assertThat(omegaCallbacks.size(), is(1));
+    assertThat(observer.serviceEntries().size(), is(1));
+  }
+
+  @Test
+  public void omegaCallbacksNotChangeOnOtherEvents() {
+    observer.onNext(abortedEvent);
+    observer.onNext(endedEvent);
+
+    assertThat(omegaCallbacks.isEmpty(), is(true));
+  }
+
+  @Test
+  public void removeOmegaCallbacksOnComplete() {
+    observer.onNext(startedEvent);
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(false));
+    assertThat(observer.serviceEntries().size(), is(1));
+
+    observer.onCompleted();
+    assertThat(omegaCallbacks.getOrDefault(serviceName, new HashMap<>()).isEmpty(), is(true));
+    assertThat(observer.serviceEntries().isEmpty(), is(true));
+  }
+
+  private GrpcTxEvent eventOf(String serviceName, String instanceId, EventType type) {
+    return GrpcTxEvent.newBuilder()
+        .setServiceName(serviceName)
+        .setInstanceId(instanceId)
+        .setType(type.name())
+        .build();
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 09cbaadc..ac522b04 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,36 +20,41 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
-
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
 
 public class GrpcClientMessageSender implements MessageSender {
 
-  private final TxEventServiceBlockingStub eventService;
+  private final TxEventServiceStub eventService;
 
   private final MessageSerializer serializer;
   private final ServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
-    this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
+  private final StreamObserver<GrpcTxEvent> requestObserver;
+
+  public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+      MessageHandler handler) {
+    this.eventService = TxEventServiceGrpc.newStub(channel);
     this.serializer = serializer;
     this.serviceConfig = serviceConfig;
+    this.requestObserver = this.eventService.callbackCommand(new GrpcCompensateStreamObserver(handler));
   }
 
   @Override
   public void send(TxEvent event) {
-    eventService.reportEvent(convertEvent(event));
+    requestObserver.onNext(convertEvent(event));
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
new file mode 100644
index 00000000..1429da65
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final MessageHandler messageHandler;
+
+  public GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+    this.messageHandler = messageHandler;
+  }
+
+  @Override
+  public void onNext(GrpcCompensateCommand command) {
+    LOG.info("Received compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+    messageHandler.onReceive(
+        command.getGlobalTxId(),
+        command.getLocalTxId(),
+        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+        command.getCompensateMethod(),
+        command.getPayloads());
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("failed to process grpc compensate command.", t);
+  }
+
+  @Override
+  public void onCompleted() {
+  }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 4a454fd7..d00f1cc3 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -27,20 +27,21 @@
 import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,11 +69,12 @@ void close() {
   }
 
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
+  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig,
+      @Lazy MessageHandler handler) {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
diff --git a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
index d83786e0..3e5d620f 100644
--- a/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
+++ b/omega/omega-transport/omega-transport-resttemplate/src/main/java/org/apache/servicecomb/saga/omega/transport/resttemplate/TransactionHandlerInterceptor.java
@@ -33,7 +33,7 @@
 
 class TransactionHandlerInterceptor implements HandlerInterceptor {
 
-  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OmegaContext omegaContext;
 
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 06796378..5bc58364 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,7 +22,7 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc ReportEvent (GrpcTxEvent) returns (GrpcEmpty) {}
+  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {}
 }
 
 message GrpcTxEvent {
@@ -37,4 +37,10 @@ message GrpcTxEvent {
   string instanceId = 9;
 }
 
-message GrpcEmpty {}
+message GrpcCompensateCommand {
+  string globalTxId = 1;
+  string localTxId = 2;
+  string parentTxId = 3;
+  string compensateMethod = 4;
+  bytes payloads = 5;
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services