You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/02 02:52:10 UTC
[incubator-servicecomb-saga] 01/05: SCB-149 removed unnecessary
endpoint interface
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-149_service_aware_callback
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 63ccc284ef040e3469cbb5dfdc2a15c51d2b5437
Author: seanyinx <se...@huawei.com>
AuthorDate: Fri Dec 29 17:47:13 2017 +0800
SCB-149 removed unnecessary endpoint interface
Signed-off-by: seanyinx <se...@huawei.com>
---
.../saga/alpha/server/GrpcTxEventEndpointImpl.java | 1 +
integration-tests/coverage-aggregate/pom.xml | 4 +
.../connector/grpc/GrpcClientMessageSender.java | 12 ++-
.../connector/grpc/GrpcTxEventEndpointImpl.java | 39 ----------
.../grpc/GrpcClientMessageSenderTest.java | 90 ----------------------
.../saga/omega/spring/OmegaSpringConfig.java | 27 +++----
.../pack/contract/grpc/GrpcTxEventEndpoint.java | 25 ------
7 files changed, 27 insertions(+), 171 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 76ab346..42c597e 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -48,6 +48,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
message.getCompensationMethod(),
message.getPayloads().toByteArray()
));
+
GrpcEmpty reply = GrpcEmpty.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
diff --git a/integration-tests/coverage-aggregate/pom.xml b/integration-tests/coverage-aggregate/pom.xml
index ffebdee..72a172d 100644
--- a/integration-tests/coverage-aggregate/pom.xml
+++ b/integration-tests/coverage-aggregate/pom.xml
@@ -69,6 +69,10 @@
</dependency>
<dependency>
<groupId>io.servicecomb.saga</groupId>
+ <artifactId>omega-connector-grpc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.servicecomb.saga</groupId>
<artifactId>alpha-server</artifactId>
</dependency>
<dependency>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 25f6223..16f94b3 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -22,21 +22,23 @@ package io.servicecomb.saga.omega.connector.grpc;
import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.MessageSerializer;
import io.servicecomb.saga.omega.transaction.TxEvent;
import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
public class GrpcClientMessageSender implements MessageSender {
- private final GrpcTxEventEndpoint eventService;
+ private final TxEventServiceBlockingStub eventService;
private final MessageSerializer serializer;
- public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, MessageSerializer serializer) {
- this.eventService = eventService;
+ public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) {
+ this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
this.serializer = serializer;
}
@@ -47,12 +49,14 @@ public class GrpcClientMessageSender implements MessageSender {
private GrpcTxEvent convertEvent(TxEvent event) {
ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads()));
+
Builder builder = GrpcTxEvent.newBuilder()
.setTimestamp(event.timestamp())
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
.setType(event.type())
.setPayloads(payloads);
+
if (event.parentTxId() != null) {
builder.setParentTxId(event.parentTxId());
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
deleted file mode 100644
index b3f2b26..0000000
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-
-public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint {
-
- private final TxEventServiceBlockingStub stub;
-
- public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) {
- this.stub = stub;
- }
-
- @Override
- public void reportEvent(GrpcTxEvent event) {
- stub.reportEvent(event);
- }
-}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
deleted file mode 100644
index ca4f034..0000000
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.omega.transaction.TxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-
-public class GrpcClientMessageSenderTest {
- private final String globalTxId = uniquify("global tx id");
-
- private final String localTxId = uniquify("local tx id");
-
- private final String parentTxId = uniquify("parent tx id");
-
- private final String payload1 = uniquify("payload1");
-
- private final String payload2 = uniquify("payload2");
-
- private GrpcTxEvent grpcTxEvent;
-
- private final MessageSerializer serializer = new MessageSerializer() {
- @Override
- public byte[] serialize(TxEvent event) {
- return serialize(event.payloads());
- }
-
- @Override
- public byte[] serialize(Object[] objects) {
- try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
- for (Object o : objects) {
- stream.write(o.toString().getBytes());
- }
- return stream.toByteArray();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- };
-
-
- private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() {
- @Override
- public void reportEvent(GrpcTxEvent event) {
- grpcTxEvent = event;
- }
- };
-
- private final GrpcClientMessageSender messageSender = new GrpcClientMessageSender(eventService, serializer);
-
- @Test
- public void sendSerializedEvent() throws Exception {
- TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, payload2);
-
- messageSender.send(event);
-
- assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId()));
- assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId()));
- assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId()));
- assertThat(grpcTxEvent.getPayloads().toByteArray(), is(serializer.serialize(event)));
- }
-}
\ No newline at end of file
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 9b749fd..73e2212 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,7 +18,9 @@
package io.servicecomb.saga.omega.spring;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import javax.annotation.PreDestroy;
@@ -31,21 +33,17 @@ import org.springframework.context.annotation.Configuration;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
-import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl;
import io.servicecomb.saga.omega.context.IdGenerator;
import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.format.NativeMessageFormat;
import io.servicecomb.saga.omega.transaction.MessageSender;
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
@Configuration
class OmegaSpringConfig {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private ManagedChannel clientChannel;
+ private final List<ManagedChannel> channels = new ArrayList<>();
@Bean
IdGenerator<String> idGenerator() {
@@ -59,7 +57,7 @@ class OmegaSpringConfig {
@PreDestroy
void close() {
- clientChannel.shutdown();
+ channels.forEach(ManagedChannel::shutdown);
}
@Bean
@@ -67,8 +65,7 @@ class OmegaSpringConfig {
// TODO: 2017/12/26 connect to the one with lowest latency
for (String address : addresses) {
try {
- String[] pair = address.split(":");
- return createMessageSender(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+ return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat());
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
}
@@ -78,10 +75,14 @@ class OmegaSpringConfig {
"None of the alpha cluster is reachable: " + Arrays.toString(addresses));
}
- private GrpcClientMessageSender createMessageSender(String host, int port, MessageSerializer serializer) {
- clientChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
- TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel);
- GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub);
- return new GrpcClientMessageSender(eventService, serializer);
+ private ManagedChannel grpcChannel(String address) {
+ String[] pair = address.split(":");
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], Integer.parseInt(pair[1]))
+ .usePlaintext(true)
+ .build();
+
+ channels.add(channel);
+ return channel;
}
}
diff --git a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
deleted file mode 100644
index 32a3b6b..0000000
--- a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.pack.contract.grpc;
-
-public interface GrpcTxEventEndpoint {
- void reportEvent(GrpcTxEvent message);
-}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.