You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/10 03:31:25 UTC
[incubator-servicecomb-saga] 01/07: SCB-168 load balanced alpha
cluster client
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e5efd4164e5d8b35b1f336c1c6657d37fb3fe257
Author: seanyinx <se...@huawei.com>
AuthorDate: Mon Jan 8 14:22:51 2018 +0800
SCB-168 load balanced alpha cluster client
Signed-off-by: seanyinx <se...@huawei.com>
---
omega/omega-connector/omega-connector-grpc/pom.xml | 16 ++
.../grpc/LoadBalancedClusterMessageSender.java | 139 ++++++++++++++
.../grpc/LoadBalancedClusterMessageSenderTest.java | 211 +++++++++++++++++++++
.../src/test/resources/log4j2-test.xml | 30 +++
.../saga/omega/format/MessageFormatTestBase.java | 2 +-
.../saga/omega/transaction/MessageSerializer.java | 2 -
6 files changed, 397 insertions(+), 3 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml
index 375fac7..258eb4b 100644
--- a/omega/omega-connector/omega-connector-grpc/pom.xml
+++ b/omega/omega-connector/omega-connector-grpc/pom.xml
@@ -53,6 +53,18 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
@@ -70,5 +82,9 @@
<groupId>com.github.seanyinx</groupId>
<artifactId>unit-scaffolding</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
new file mode 100644
index 0000000..276f887
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolver;
+import io.grpc.NameResolver.Factory;
+import io.grpc.util.RoundRobinLoadBalancerFactory;
+
+public class LoadBalancedClusterMessageSender implements MessageSender {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final MessageSender messageSender;
+
+ public LoadBalancedClusterMessageSender(String addresses,
+ MessageSerializer serializer,
+ MessageDeserializer deserializer,
+ ServiceConfig serviceConfig,
+ MessageHandler handler) {
+
+ this(new GrpcClientMessageSender(clusterDirectAddressChannel(addresses),
+ serializer,
+ deserializer,
+ serviceConfig,
+ handler));
+ }
+
+ LoadBalancedClusterMessageSender(MessageSender messageSender) {
+ this.messageSender = messageSender;
+ }
+
+ private static ManagedChannel clusterDirectAddressChannel(String addresses) {
+ return ManagedChannelBuilder.forTarget(addresses)
+ .nameResolverFactory(new ClusterNameResolverFactory(addresses))
+ .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
+ .usePlaintext(true)
+ .build();
+ }
+
+ @Override
+ public void onConnected() {
+ messageSender.onConnected();
+ }
+
+ @Override
+ public void onDisconnected() {
+ messageSender.onDisconnected();
+ }
+
+ @Override
+ public void send(TxEvent event) {
+ boolean success = false;
+ do {
+ try {
+ messageSender.send(event);
+ success = true;
+ } catch (Exception e) {
+ log.error("Retry sending event {} due to failure", event, e);
+ }
+ } while (!success && !Thread.currentThread().isInterrupted());
+
+ }
+
+ private static class ClusterNameResolverFactory extends Factory {
+ private final String addresses;
+
+ private ClusterNameResolverFactory(String addresses) {
+ this.addresses = addresses;
+ }
+
+ @Override
+ public NameResolver newNameResolver(URI targetUri, Attributes params) {
+ return new NameResolver() {
+ @Override
+ public String getServiceAuthority() {
+ return "localhost";
+ }
+
+ @Override
+ public void start(final Listener listener) {
+ List<SocketAddress> socketAddresses = Arrays.stream(addresses.split(","))
+ .map(address -> {
+ String[] split = address.split(":");
+ return new InetSocketAddress(split[0], Integer.parseInt(split[1]));
+ })
+ .collect(Collectors.toList());
+
+ listener.onAddresses(
+ Arrays.asList(new EquivalentAddressGroup(socketAddresses.get(0)),
+ new EquivalentAddressGroup(socketAddresses.get(1))),
+ Attributes.EMPTY);
+ }
+
+ @Override
+ public void shutdown() {
+ }
+ };
+ }
+
+ @Override
+ public String getDefaultScheme() {
+ return "directaddress";
+ }
+ }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
new file mode 100644
index 0000000..4fee0e6
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+public class LoadBalancedClusterMessageSenderTest {
+
+ private static final int[] ports = {8080, 8090};
+ private static final List<Server> servers = new ArrayList<>();
+
+ private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>();
+ private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>();
+
+ private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{
+ put(8080, new ConcurrentSkipListSet<>());
+ put(8090, new ConcurrentSkipListSet<>());
+ }};
+
+ private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{
+ put(8080, eventsOn8080);
+ put(8090, eventsOn8090);
+ }};
+
+ private final String addresses = "localhost:8080,localhost:8090";
+
+ private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
+
+ private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
+ private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
+ };
+
+ private final String globalTxId = uniquify("globalTxId");
+ private final String localTxId = uniquify("localTxId");
+ private final String parentTxId = uniquify("parentTxId");
+ private final String compensationMethod = getClass().getCanonicalName();
+ private final TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, compensationMethod, "blah");
+
+ private final String serviceName = uniquify("serviceName");
+ private final MessageSender messageSender = new LoadBalancedClusterMessageSender(
+ addresses,
+ serializer,
+ deserializer,
+ new ServiceConfig(serviceName),
+ handler);
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Arrays.stream(ports).forEach(port -> {
+ ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
+ serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port)));
+ Server server = serverBuilder.build();
+
+ try {
+ server.start();
+ servers.add(server);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ });
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ servers.forEach(Server::shutdown);
+ }
+
+ @Test
+ public void reconnectOnConnectionLoss() throws Exception {
+ messageSender.send(event);
+
+ killServerReceivedMessage();
+
+ messageSender.send(event);
+
+ assertThat(eventsOn8080.size(), is(1));
+ assertThat(eventsOn8080.peek().toString(), is(event.toString()));
+
+ assertThat(eventsOn8090.size(), is(1));
+ assertThat(eventsOn8090.peek().toString(), is(event.toString()));
+ }
+
+ @Test (timeout = 1000)
+ public void stopSendingOnInterruption() throws Exception {
+ MessageSender underlying = Mockito.mock(MessageSender.class);
+ doThrow(RuntimeException.class).when(underlying).send(event);
+
+ MessageSender messageSender = new LoadBalancedClusterMessageSender(underlying);
+
+ Thread thread = new Thread(() -> messageSender.send(event));
+ thread.start();
+
+ Thread.sleep(300);
+
+ thread.interrupt();
+ thread.join();
+ }
+
+ @Ignore
+ @Test
+ public void broadcastConnectionAndDisconnection() throws Exception {
+ messageSender.onConnected();
+ await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty());
+
+ assertThat(connected.get(8080), contains(serviceName));
+ assertThat(connected.get(8090), contains(serviceName));
+
+ messageSender.onDisconnected();
+ assertThat(connected.get(8080).isEmpty(), is(true));
+ assertThat(connected.get(8090).isEmpty(), is(true));
+ }
+
+ private void killServerReceivedMessage() {
+ int index = 0;
+ for (int port : eventsMap.keySet()) {
+ if (!eventsMap.get(port).isEmpty()) {
+ servers.get(index).shutdownNow();
+ }
+ index++;
+ }
+ }
+
+ private static class MyTxEventService extends TxEventServiceImplBase {
+ private final Set<String> connected;
+ private final Queue<TxEvent> events;
+
+ private MyTxEventService(Set<String> connected, Queue<TxEvent> events) {
+ this.connected = connected;
+ this.events = events;
+ }
+
+ @Override
+ public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+ connected.add(request.getInstanceId());
+ }
+
+ @Override
+ public void onTxEvent(GrpcTxEvent request, StreamObserver<GrpcAck> responseObserver) {
+ events.offer(new TxEvent(
+ request.getGlobalTxId(),
+ request.getLocalTxId(),
+ request.getParentTxId(),
+ request.getCompensationMethod(),
+ new String(request.getPayloads().toByteArray())));
+
+ responseObserver.onNext(GrpcAck.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+ connected.remove(request.getInstanceId());
+ responseObserver.onNext(GrpcAck.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
index 17674b7..d3c591e 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
@@ -37,7 +37,7 @@ public class MessageFormatTestBase {
@Test
public void serializeObjectIntoBytes() throws Exception {
- byte[] bytes = format.serialize(eventOf("hello", "world"));
+ byte[] bytes = format.serialize(new String[]{"hello", "world"});
Object[] message = format.deserialize(bytes);
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
index 7dfe0ca..0bc1e46 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
@@ -18,7 +18,5 @@
package org.apache.servicecomb.saga.omega.transaction;
public interface MessageSerializer {
- byte[] serialize(TxEvent event);
-
byte[] serialize(Object[] objects);
}
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.