You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/04 08:03:44 UTC
[incubator-servicecomb-saga] 02/07: SCB-138 set up bidirectional
streaming for omega
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 9b21624c2358e7767a64cf9e60e15b4cfd1ebb72
Author: Eric Lee <da...@huawei.com>
AuthorDate: Tue Jan 2 21:17:05 2018 +0800
SCB-138 set up bidirectional streaming for omega
Signed-off-by: Eric Lee <da...@huawei.com>
---
.../saga/alpha/server/GrpcOmegaCallback.java | 1 +
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 +-
...bserver.java => GrpcTxEventStreamObserver.java} | 4 +-
.../saga/alpha/server/AlphaIntegrationTest.java | 49 +++++------------
.../connector/grpc/GrpcClientMessageSender.java | 27 ++++++----
.../grpc/GrpcCompensateStreamObserver.java | 62 ++++++++++++++++++++++
.../saga/omega/spring/OmegaSpringConfig.java | 10 ++--
.../src/main/proto/GrpcTxEvent.proto | 8 +--
8 files changed, 104 insertions(+), 59 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
index db04253..8577726 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaCallback.java
@@ -41,6 +41,7 @@ public class GrpcOmegaCallback implements OmegaCallback {
GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
+ .setParentTxId(event.parentTxId().isEmpty() ? "" : event.parentTxId())
.setCompensateMethod(event.compensationMethod())
.setPayloads(ByteString.copyFrom(event.payloads()))
.build();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index b68972e..0b84217 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -44,6 +44,6 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
@Override
public StreamObserver<GrpcTxEvent> callbackCommand(StreamObserver<GrpcCompensateCommand> responseObserver) {
- return new GrpcOmegaStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
+ return new GrpcTxEventStreamObserver(omegaCallbacks, txConsistentService, responseObserver);
}
}
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
similarity index 95%
rename from alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
rename to alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
index d8d10f5..0bbb52f 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcOmegaStreamObserver.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventStreamObserver.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import io.grpc.stub.StreamObserver;
-class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
+class GrpcTxEventStreamObserver implements StreamObserver<GrpcTxEvent> {
private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -47,7 +47,7 @@ class GrpcOmegaStreamObserver implements StreamObserver<GrpcTxEvent> {
private final StreamObserver<GrpcCompensateCommand> responseObserver;
- GrpcOmegaStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
+ GrpcTxEventStreamObserver(Map<String, Map<String, OmegaCallback>> omegaCallbacks,
TxConsistentService txConsistentService, StreamObserver<GrpcCompensateCommand> responseObserver) {
this.omegaCallbacks = omegaCallbacks;
this.txConsistentService = txConsistentService;
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 20921a8..863c8af 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -19,6 +19,8 @@ package org.apache.servicecomb.saga.alpha.server;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxAbortedEvent;
+import static org.apache.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
@@ -28,13 +30,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.servicecomb.saga.alpha.core.EventType;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
import org.apache.servicecomb.saga.alpha.core.TxEvent;
+import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
@@ -49,24 +55,7 @@ import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
-
-import org.apache.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-=======
import io.grpc.stub.StreamObserver;
-import io.servicecomb.saga.alpha.core.EventType;
-import io.servicecomb.saga.alpha.core.OmegaCallback;
-import io.servicecomb.saga.alpha.core.TxConsistentService;
-import io.servicecomb.saga.alpha.core.TxEvent;
-import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig;
-import io.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090")
@@ -93,13 +82,7 @@ public class AlphaIntegrationTest {
@Autowired
private List<CompensationContext> compensationContexts;
- @Autowired
- private Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-
- @Autowired
- private TxConsistentService txConsistentService;
-
- // use an empty response observer as we don't need the response
+ // use an empty response observer as we don't need the response in client side
private final StreamObserver<GrpcCompensateCommand> emptyResponseObserver = new StreamObserver<GrpcCompensateCommand>() {
@Override
public void onNext(GrpcCompensateCommand value) {
@@ -121,14 +104,10 @@ public class AlphaIntegrationTest {
@Test
public void persistsEvent() throws Exception {
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
- stub.reportEvent(someGrpcEvent(EventType.TxStartedEvent));
-=======
StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
requestObserver.onNext(someGrpcEvent(TxStartedEvent));
// use the asynchronous stub need to wait for some time
await().atMost(1, SECONDS).until(() -> eventRepo.findByEventGlobalTxId(globalTxId) != null);
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId);
@@ -137,7 +116,7 @@ public class AlphaIntegrationTest {
assertThat(envelope.globalTxId(), is(globalTxId));
assertThat(envelope.localTxId(), is(localTxId));
assertThat(envelope.parentTxId(), is(parentTxId));
- assertThat(envelope.type(), Is.is(EventType.TxStartedEvent.name()));
+ assertThat(envelope.type(), Is.is(TxStartedEvent.name()));
assertThat(envelope.compensationMethod(), is(compensationMethod));
assertThat(envelope.payloads(), is(payload.getBytes()));
}
@@ -145,20 +124,16 @@ public class AlphaIntegrationTest {
@Test
public void doNotCompensateDuplicateTxOnFailure() throws Exception {
// duplicate events with same content but different timestamp
- eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
- eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId, parentTxId, "service a".getBytes(), "method a"));
eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method a"));
String localTxId1 = UUID.randomUUID().toString();
- eventRepo.save(eventEnvelopeOf(EventType.TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
+ eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes(), "method b"));
eventRepo.save(eventEnvelopeOf(EventType.TxEndedEvent, new byte[0], "method b"));
-<<<<<<< HEAD:alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
- stub.reportEvent(someGrpcEvent(EventType.TxAbortedEvent));
-=======
StreamObserver<GrpcTxEvent> requestObserver = stub.callbackCommand(emptyResponseObserver);
requestObserver.onNext(someGrpcEvent(TxAbortedEvent));
->>>>>>> 6985eb8... SCB-138 set up bidirectional streaming for alpha:alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1);
assertThat(compensationContexts, containsInAnyOrder(
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 09cbaad..482e03a 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -20,36 +20,41 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
-
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
public class GrpcClientMessageSender implements MessageSender {
- private final TxEventServiceBlockingStub eventService;
+ private final TxEventServiceStub eventService;
private final MessageSerializer serializer;
private final ServiceConfig serviceConfig;
- public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) {
- this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
+ private StreamObserver<GrpcTxEvent> requestObserver;
+
+ public GrpcClientMessageSender(ManagedChannel channel, MessageSerializer serializer, ServiceConfig serviceConfig,
+ MessageHandler handler) {
+ this.eventService = TxEventServiceGrpc.newStub(channel);
this.serializer = serializer;
this.serviceConfig = serviceConfig;
+ this.requestObserver = this.eventService.callbackCommand(new GrpcCompensateStreamObserver(handler));
}
@Override
public void send(TxEvent event) {
- eventService.reportEvent(convertEvent(event));
+ requestObserver.onNext(convertEvent(event));
}
private GrpcTxEvent convertEvent(TxEvent event) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
new file mode 100644
index 0000000..01fdc35
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/GrpcCompensateStreamObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.stub.StreamObserver;
+
+public class GrpcCompensateStreamObserver implements StreamObserver<GrpcCompensateCommand> {
+
+ private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final MessageHandler messageHandler;
+
+ public GrpcCompensateStreamObserver(MessageHandler messageHandler) {
+ this.messageHandler = messageHandler;
+ }
+
+ @Override
+ public void onNext(GrpcCompensateCommand command) {
+ LOG.info("receive compensate command, global tx id: {}, local tx id: {}, compensate method: {}",
+ command.getGlobalTxId(), command.getLocalTxId(), command.getCompensateMethod());
+ messageHandler.onReceive(
+ command.getGlobalTxId(),
+ command.getLocalTxId(),
+ command.getParentTxId().isEmpty() ? null : command.getParentTxId(),
+ command.getCompensateMethod(),
+ command.getPayloads());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.error("failed to process grpc compensate command.", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 4a454fd..d00f1cc 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -27,20 +27,21 @@ import javax.annotation.PreDestroy;
import org.apache.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-
@Configuration
class OmegaSpringConfig {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,11 +69,12 @@ class OmegaSpringConfig {
}
@Bean
- MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) {
+ MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig,
+ @Lazy MessageHandler handler) {
// TODO: 2017/12/26 connect to the one with lowest latency
for (String address : addresses) {
try {
- return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig);
+ return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig, handler);
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index 75a5664..5bc5836 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -22,8 +22,7 @@ option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
option java_outer_classname = "TxEventProto";
service TxEventService {
- rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {
- }
+ rpc CallbackCommand (stream GrpcTxEvent) returns (stream GrpcCompensateCommand) {}
}
message GrpcTxEvent {
@@ -41,6 +40,7 @@ message GrpcTxEvent {
message GrpcCompensateCommand {
string globalTxId = 1;
string localTxId = 2;
- string compensateMethod = 3;
- bytes payloads = 4;
+ string parentTxId = 3;
+ string compensateMethod = 4;
+ bytes payloads = 5;
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.