You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/01/10 03:31:25 UTC

[incubator-servicecomb-saga] 01/07: SCB-168 load balanced alpha cluster client

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e5efd4164e5d8b35b1f336c1c6657d37fb3fe257
Author: seanyinx <se...@huawei.com>
AuthorDate: Mon Jan 8 14:22:51 2018 +0800

    SCB-168 load balanced alpha cluster client
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 omega/omega-connector/omega-connector-grpc/pom.xml |  16 ++
 .../grpc/LoadBalancedClusterMessageSender.java     | 139 ++++++++++++++
 .../grpc/LoadBalancedClusterMessageSenderTest.java | 211 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.xml             |  30 +++
 .../saga/omega/format/MessageFormatTestBase.java   |   2 +-
 .../saga/omega/transaction/MessageSerializer.java  |   2 -
 6 files changed, 397 insertions(+), 3 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml
index 375fac7..258eb4b 100644
--- a/omega/omega-connector/omega-connector-grpc/pom.xml
+++ b/omega/omega-connector/omega-connector-grpc/pom.xml
@@ -53,6 +53,18 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
@@ -70,5 +82,9 @@
       <groupId>com.github.seanyinx</groupId>
       <artifactId>unit-scaffolding</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
new file mode 100644
index 0000000..276f887
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolver;
+import io.grpc.NameResolver.Factory;
+import io.grpc.util.RoundRobinLoadBalancerFactory;
+
+public class LoadBalancedClusterMessageSender implements MessageSender {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final MessageSender messageSender;
+
+  public LoadBalancedClusterMessageSender(String addresses,
+      MessageSerializer serializer,
+      MessageDeserializer deserializer,
+      ServiceConfig serviceConfig,
+      MessageHandler handler) {
+
+    this(new GrpcClientMessageSender(clusterDirectAddressChannel(addresses),
+        serializer,
+        deserializer,
+        serviceConfig,
+        handler));
+  }
+
+  LoadBalancedClusterMessageSender(MessageSender messageSender) {
+    this.messageSender = messageSender;
+  }
+
+  private static ManagedChannel clusterDirectAddressChannel(String addresses) {
+    return ManagedChannelBuilder.forTarget(addresses)
+        .nameResolverFactory(new ClusterNameResolverFactory(addresses))
+        .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
+        .usePlaintext(true)
+        .build();
+  }
+
+  @Override
+  public void onConnected() {
+    messageSender.onConnected();
+  }
+
+  @Override
+  public void onDisconnected() {
+    messageSender.onDisconnected();
+  }
+
+  @Override
+  public void send(TxEvent event) {
+    boolean success = false;
+    do {
+      try {
+        messageSender.send(event);
+        success = true;
+      } catch (Exception e) {
+        log.error("Retry sending event {} due to failure", event, e);
+      }
+    } while (!success && !Thread.currentThread().isInterrupted());
+
+  }
+
+  private static class ClusterNameResolverFactory extends Factory {
+    private final String addresses;
+
+    private ClusterNameResolverFactory(String addresses) {
+      this.addresses = addresses;
+    }
+
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+      return new NameResolver() {
+        @Override
+        public String getServiceAuthority() {
+          return "localhost";
+        }
+
+        @Override
+        public void start(final Listener listener) {
+          List<SocketAddress> socketAddresses = Arrays.stream(addresses.split(","))
+              .map(address -> {
+                String[] split = address.split(":");
+                return new InetSocketAddress(split[0], Integer.parseInt(split[1]));
+              })
+              .collect(Collectors.toList());
+
+          listener.onAddresses(
+              Arrays.asList(new EquivalentAddressGroup(socketAddresses.get(0)),
+                  new EquivalentAddressGroup(socketAddresses.get(1))),
+              Attributes.EMPTY);
+        }
+
+        @Override
+        public void shutdown() {
+        }
+      };
+    }
+
+    @Override
+    public String getDefaultScheme() {
+      return "directaddress";
+    }
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
new file mode 100644
index 0000000..4fee0e6
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.servicecomb.saga.omega.context.ServiceConfig;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+public class LoadBalancedClusterMessageSenderTest {
+
+  private static final int[] ports = {8080, 8090};
+  private static final List<Server> servers = new ArrayList<>();
+
+  private static final Queue<TxEvent> eventsOn8080 = new ConcurrentLinkedQueue<>();
+  private static final Queue<TxEvent> eventsOn8090 = new ConcurrentLinkedQueue<>();
+
+  private static final Map<Integer, Set<String>> connected = new HashMap<Integer, Set<String>>() {{
+    put(8080, new ConcurrentSkipListSet<>());
+    put(8090, new ConcurrentSkipListSet<>());
+  }};
+
+  private static final Map<Integer, Queue<TxEvent>> eventsMap = new HashMap<Integer, Queue<TxEvent>>() {{
+    put(8080, eventsOn8080);
+    put(8090, eventsOn8090);
+  }};
+
+  private final String addresses = "localhost:8080,localhost:8090";
+
+  private final MessageSerializer serializer = objects -> objects[0].toString().getBytes();
+
+  private final MessageDeserializer deserializer = message -> new Object[] {new String(message)};
+  private final MessageHandler handler = (globalTxId, localTxId, parentTxId, compensationMethod, payloads) -> {
+  };
+
+  private final String globalTxId = uniquify("globalTxId");
+  private final String localTxId = uniquify("localTxId");
+  private final String parentTxId = uniquify("parentTxId");
+  private final String compensationMethod = getClass().getCanonicalName();
+  private final TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, compensationMethod, "blah");
+
+  private final String serviceName = uniquify("serviceName");
+  private final MessageSender messageSender = new LoadBalancedClusterMessageSender(
+      addresses,
+      serializer,
+      deserializer,
+      new ServiceConfig(serviceName),
+      handler);
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Arrays.stream(ports).forEach(port -> {
+      ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
+      serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port)));
+      Server server = serverBuilder.build();
+
+      try {
+        server.start();
+        servers.add(server);
+      } catch (IOException e) {
+        fail(e.getMessage());
+      }
+    });
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    servers.forEach(Server::shutdown);
+  }
+
+  @Test
+  public void reconnectOnConnectionLoss() throws Exception {
+    messageSender.send(event);
+
+    killServerReceivedMessage();
+
+    messageSender.send(event);
+
+    assertThat(eventsOn8080.size(), is(1));
+    assertThat(eventsOn8080.peek().toString(), is(event.toString()));
+
+    assertThat(eventsOn8090.size(), is(1));
+    assertThat(eventsOn8090.peek().toString(), is(event.toString()));
+  }
+
+  @Test (timeout = 1000)
+  public void stopSendingOnInterruption() throws Exception {
+    MessageSender underlying = Mockito.mock(MessageSender.class);
+    doThrow(RuntimeException.class).when(underlying).send(event);
+
+    MessageSender messageSender = new LoadBalancedClusterMessageSender(underlying);
+
+    Thread thread = new Thread(() -> messageSender.send(event));
+    thread.start();
+
+    Thread.sleep(300);
+
+    thread.interrupt();
+    thread.join();
+  }
+
+  @Ignore
+  @Test
+  public void broadcastConnectionAndDisconnection() throws Exception {
+    messageSender.onConnected();
+    await().atMost(1, SECONDS).until(() -> !connected.get(8080).isEmpty() && !connected.get(8090).isEmpty());
+
+    assertThat(connected.get(8080), contains(serviceName));
+    assertThat(connected.get(8090), contains(serviceName));
+
+    messageSender.onDisconnected();
+    assertThat(connected.get(8080).isEmpty(), is(true));
+    assertThat(connected.get(8090).isEmpty(), is(true));
+  }
+
+  private void killServerReceivedMessage() {
+    int index = 0;
+    for (int port : eventsMap.keySet()) {
+      if (!eventsMap.get(port).isEmpty()) {
+        servers.get(index).shutdownNow();
+      }
+      index++;
+    }
+  }
+
+  private static class MyTxEventService extends TxEventServiceImplBase {
+    private final Set<String> connected;
+    private final Queue<TxEvent> events;
+
+    private MyTxEventService(Set<String> connected, Queue<TxEvent> events) {
+      this.connected = connected;
+      this.events = events;
+    }
+
+    @Override
+    public void onConnected(GrpcServiceConfig request, StreamObserver<GrpcCompensateCommand> responseObserver) {
+      connected.add(request.getInstanceId());
+    }
+
+    @Override
+    public void onTxEvent(GrpcTxEvent request, StreamObserver<GrpcAck> responseObserver) {
+      events.offer(new TxEvent(
+          request.getGlobalTxId(),
+          request.getLocalTxId(),
+          request.getParentTxId(),
+          request.getCompensationMethod(),
+          new String(request.getPayloads().toByteArray())));
+
+      responseObserver.onNext(GrpcAck.newBuilder().build());
+      responseObserver.onCompleted();
+    }
+
+    @Override
+    public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) {
+      connected.remove(request.getInstanceId());
+      responseObserver.onNext(GrpcAck.newBuilder().build());
+      responseObserver.onCompleted();
+    }
+  }
+}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
index 17674b7..d3c591e 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
@@ -37,7 +37,7 @@ public class MessageFormatTestBase {
 
   @Test
   public void serializeObjectIntoBytes() throws Exception {
-    byte[] bytes = format.serialize(eventOf("hello", "world"));
+    byte[] bytes = format.serialize(new String[]{"hello", "world"});
 
     Object[] message = format.deserialize(bytes);
 
diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
index 7dfe0ca..0bc1e46 100644
--- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
+++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/MessageSerializer.java
@@ -18,7 +18,5 @@
 package org.apache.servicecomb.saga.omega.transaction;
 
 public interface MessageSerializer {
-  byte[] serialize(TxEvent event);
-
   byte[] serialize(Object[] objects);
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.