You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:44 UTC

[incubator-servicecomb-saga] 02/07: SCB-138 set up bidirectional streaming for omega

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 9b21624c2358e7767a64cf9e60e15b4cfd1ebb72
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 2 21:17:05 2018 +0800

    SCB-138 set up bidirectional streaming for omega
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 .../saga/alpha/server/GrpcOmegaCallback.java       |  1 +
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  2 +-
 ...bserver.java => GrpcTxEventStreamObserver.java} |  4 +-
 .../saga/alpha/server/AlphaIntegrationTest.java    | 49 +++++------------
 .../connector/grpc/GrpcClientMessageSender.java    | 27 ++++++----
 .../grpc/GrpcCompensateStreamObserver.java         | 62 ++++++++++++++++++++++
 .../saga/omega/spring/OmegaSpringConfig.java       | 10 ++--
 .../src/main/proto/GrpcTxEvent.proto               |  8 +--
 8 files changed, 104 insertions(+), 59 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index db04253..8577726 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -41,6 +41,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
     GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
+        .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
         .setCompensateMethod(event.compensationMethod())
         .setPayloads(ByteString.copyFrom(event.payloads()))
         .build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index b68972e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -44,6 +44,6 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
 
   @Override
   public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
-    return new GrpcOmegaStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+    return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
   }
 }
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
similarity index 95%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index d8d10f5..0bbb52f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import io.grpc.stub.StreamObserver;
 
-class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
+class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -47,7 +47,7 @@ class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
 
   private final StreamObserver<GrpcCompensateCommand> responseObserver;
 
-  GrpcOmegaStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+  GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
       TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
     this.omegaCallbacks = omegaCallbacks;
     this.txConsistentService = txConsistentService;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 20921a8..863c8af 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,8 @@ package org.apache.servicecomb.saga.alpha.server;
 
 import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
@@ -28,13 +30,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.alpha.core.EventType;
 import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -49,24 +55,7 @@ import com.google.protobuf.ByteString;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-=======
 import io.grpc.stub.StreamObserver;
-import io.servicecomb.saga.alpha.core.EventType;
-import io.servicecomb.saga.alpha.core.OmegaCallback;
-import io.servicecomb.saga.alpha.core.TxConsistentService;
-import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import io.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
@@ -93,13 +82,7 @@ public class AlphaIntegrationTest {
   @Autowired
   private List<CompensationContext> compensationContexts;
 
-  @Autowired
-  private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-
-  @Autowired
-  private TxConsistentService txConsistentService;
-
-  // use an empty response observer as we don't need the response
+  // use an empty response observer as we don't need the response in client side
   private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
     @Override
     public void onNext(GrpcCompensateCommand value) {
@@ -121,14 +104,10 @@ public class AlphaIntegrationTest {
 
   @Test
   public void persistsEvent() throws Exception {
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-    stub.reportEvent(someGrpcEvent(EventType.TxStartedEvent));
-=======
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxStartedEvent));
     // use the asynchronous stub need to wait for some time
     await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
 
@@ -137,7 +116,7 @@ public class AlphaIntegrationTest {
     assertThat(envelope.globalTxId(), is(globalTxId));
     assertThat(envelope.localTxId(), is(localTxId));
     assertThat(envelope.parentTxId(), is(parentTxId));
-    assertThat(envelope.type(), Is.is(EventType.TxStartedEvent.name()));
+    assertThat(envelope.type(), Is.is(TxStartedEvent.name()));
     assertThat(envelope.compensationMethod(), is(compensationMethod));
     assertThat(envelope.payloads(), is(payload.getBytes()));
   }
@@ -145,20 +124,16 @@ public class AlphaIntegrationTest {
   @Test
   public void doNotCompensateDuplicateTxOnFailure() throws Exception {
     // duplicate events with same content but different timestamp
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
     eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
 
     String localTxId1 = UUID.randomUUID().toString();
-    eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
+    eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
     eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
 
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-    stub.reportEvent(someGrpcEvent(EventType.TxAbortedEvent));
-=======
     StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
     requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
     await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
     assertThat(compensationContexts, containsInAnyOrder(
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 09cbaad..482e03a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,36 +20,41 @@
 
 package org.apache.servicecomb.saga.omega.connector.grpc;
 
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
-
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
 import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
 
 public class GrpcClientMessageSender implements MessageSender {
 
-  private final TxEventServiceBlockingStub eventService;
+  private final TxEventServiceStub eventService;
 
   private final MessageSerializer serializer;
   private final ServiceConfig serviceConfig;
 
-  public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
-    this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
+  private StreamObserver<GrpcTxEvent> requestObserver;
+
+  public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+      MessageHandler handler) {
+    this.eventService = TxEventServiceGrpc.newStub(channel);
     this.serializer = serializer;
     this.serviceConfig = serviceConfig;
+    this.requestObserver = this.eventService.callbackCommand(new GrpcCompensateStreamObserver(handler));
   }
 
   @Override
   public void send(TxEvent event) {
-    eventService.reportEvent(convertEvent(event));
+    requestObserver.onNext(convertEvent(event));
   }
 
   private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
new file mode 100644
index 0000000..01fdc35
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+
+  private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final MessageHandler messageHandler;
+
+  public GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+    this.messageHandler = messageHandler;
+  }
+
+  @Override
+  public void onNext(GrpcCompensateCommand command) {
+    LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+        command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+    messageHandler.onReceive(
+        command.getGlobalTxId(),
+        command.getLocalTxId(),
+        command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+        command.getCompensateMethod(),
+        command.getPayloads());
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("failed to process grpc compensate command.", t);
+  }
+
+  @Override
+  public void onCompleted() {
+  }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 4a454fd..d00f1cc 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -27,20 +27,21 @@ import javax.annotation.PreDestroy;
 import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
 import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
 import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,11 +69,12 @@ class OmegaSpringConfig {
   }
 
   @Bean
-  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
+  MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig,
+      @Lazy MessageHandler handler) {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
+        return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 75a5664..5bc5836 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,8 +22,7 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
 option java_outer_classname = "TxEventProto";
 
 service TxEventService {
-  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {
-  }
+  rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {}
 }
 
 message GrpcTxEvent {
@@ -41,6 +40,7 @@ message GrpcTxEvent {
 message GrpcCompensateCommand {
   string globalTxId = 1;
   string localTxId = 2;
-  string compensateMethod = 3;
-  bytes payloads = 4;
+  string parentTxId = 3;
+  string compensateMethod = 4;
+  bytes payloads = 5;
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.