You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/10/23 20:02:07 UTC

hbase git commit: HBASE-14535 Integration test for rpc connection concurrency / deadlock testing

Repository: hbase
Updated Branches:
  refs/heads/master eb4f9b8b3 -> b4ba615c7


HBASE-14535 Integration test for rpc connection concurrency / deadlock testing


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b4ba615c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b4ba615c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b4ba615c

Branch: refs/heads/master
Commit: b4ba615c70e89aa00ec0878d64e6e0f42e437df0
Parents: eb4f9b8
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Oct 22 18:35:34 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Oct 22 18:35:34 2015 -0700

----------------------------------------------------------------------
 .../hbase/ipc/IntegrationTestRpcClient.java     | 454 +++++++++++++++++++
 1 file changed, 454 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ba615c/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
new file mode 100644
index 0000000..09de871
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+@Category(IntegrationTests.class)
+public class IntegrationTestRpcClient {
+
+  private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
+
+  private final Configuration conf;
+
+  private int numIterations = 10;
+
+  public IntegrationTestRpcClient() {
+    conf = HBaseConfiguration.create();
+  }
+
+  static class TestRpcServer extends RpcServer {
+
+    TestRpcServer(Configuration conf) throws IOException {
+      this(new FifoRpcScheduler(conf, 1), conf);
+    }
+
+    TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
+      super(null, "testRpcServer", Lists
+          .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
+          "localhost", 0), conf, scheduler);
+    }
+
+    @Override
+    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+        Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+        throws IOException {
+      return super.call(service, md, param, cellScanner, receiveTime, status);
+    }
+  }
+
+  static final BlockingService SERVICE =
+      TestRpcServiceProtos.TestProtobufRpcProto
+      .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
+
+        @Override
+        public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
+            throws ServiceException {
+          return null;
+        }
+
+        @Override
+        public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
+            throws ServiceException {
+          return null;
+        }
+
+        @Override
+        public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
+            throws ServiceException {
+          return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+        }
+      });
+
+  protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
+    return isSyncClient ?
+        new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
+          new AsyncRpcClient(conf) {
+          @Override
+          Codec getCodec() {
+            return null;
+          }
+        };
+  }
+
+  static String BIG_PAYLOAD;
+
+  static {
+    StringBuilder builder = new StringBuilder();
+
+    while (builder.length() < 1024 * 1024) { // 2 MB
+      builder.append("big.payload.");
+    }
+
+    BIG_PAYLOAD = builder.toString();
+  }
+
+  class Cluster {
+    Random random = new Random();
+    ReadWriteLock lock = new ReentrantReadWriteLock();
+    HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
+    List<TestRpcServer> serverList = new ArrayList<>();
+    int maxServers;
+    int minServers;
+
+    Cluster(int minServers, int maxServers) {
+      this.minServers = minServers;
+      this.maxServers = maxServers;
+    }
+
+    TestRpcServer startServer() throws IOException {
+      lock.writeLock().lock();
+      try {
+        if (rpcServers.size() >= maxServers) {
+          return null;
+        }
+
+        TestRpcServer rpcServer = new TestRpcServer(conf);
+        rpcServer.start();
+        rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+        serverList.add(rpcServer);
+        LOG.info("Started server: " + rpcServer.getListenerAddress());
+        return rpcServer;
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+
+    void stopRandomServer() throws Exception {
+      lock.writeLock().lock();
+      TestRpcServer rpcServer = null;
+      try {
+        if (rpcServers.size() <= minServers) {
+          return;
+        }
+        int size = rpcServers.size();
+        int rand = random.nextInt(size);
+        rpcServer = serverList.remove(rand);
+        rpcServers.remove(rpcServer.getListenerAddress());
+
+        if (rpcServer != null) {
+          stopServer(rpcServer);
+        }
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+
+    void stopServer(TestRpcServer rpcServer) throws InterruptedException {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      LOG.info("Stopping server: " + address);
+      rpcServer.stop();
+      rpcServer.join();
+      LOG.info("Stopped server: " + address);
+    }
+
+    void stopRunning() throws InterruptedException {
+      lock.writeLock().lock();
+      try {
+        for (TestRpcServer rpcServer : serverList) {
+          stopServer(rpcServer);
+        }
+
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+
+    TestRpcServer getRandomServer() {
+      lock.readLock().lock();
+      try {
+        int size = rpcServers.size();
+        int rand = random.nextInt(size);
+        return serverList.get(rand);
+      } finally {
+        lock.readLock().unlock();
+      }
+    }
+  }
+
+  static class MiniChaosMonkey extends Thread {
+    AtomicBoolean running = new  AtomicBoolean(true);
+    Random random = new Random();
+    AtomicReference<Exception> exception = new AtomicReference<>(null);
+    Cluster cluster;
+
+    public MiniChaosMonkey(Cluster cluster) {
+      this.cluster = cluster;
+    }
+
+    @Override
+    public void run() {
+      while (running.get()) {
+        switch (random.nextInt() % 2) {
+        case 0: //start a server
+          try {
+            cluster.startServer();
+          } catch (Exception e) {
+            LOG.warn(e);
+            exception.compareAndSet(null, e);
+          }
+          break;
+
+        case 1: // stop a server
+          try {
+            cluster.stopRandomServer();
+          } catch (Exception e) {
+            LOG.warn(e);
+            exception.compareAndSet(null, e);
+          }
+        default:
+        }
+
+        Threads.sleep(100);
+      }
+    }
+
+    void stopRunning() {
+      running.set(false);
+    }
+
+    void rethrowException() throws Exception {
+      if (exception.get() != null) {
+        throw exception.get();
+      }
+    }
+  }
+
+  static class SimpleClient extends Thread {
+    AbstractRpcClient rpcClient;
+    AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    Cluster cluster;
+    String id;
+    long numCalls = 0;
+    Random random = new Random();
+
+    public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
+      this.cluster = cluster;
+      this.rpcClient = rpcClient;
+      this.id = id;
+    }
+
+    @Override
+    public void run() {
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+
+      while (running.get()) {
+        boolean isBigPayload = random.nextBoolean();
+        String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
+        EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+        EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
+
+        TestRpcServer server = cluster.getRandomServer();
+        try {
+          User user = User.getCurrent();
+          ret = (EchoResponseProto)
+              rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+        } catch (Exception e) {
+          LOG.warn(e);
+          continue; // expected in case connection is closing or closed
+        }
+
+        try {
+          assertNotNull(ret);
+          assertEquals(message, ret.getMessage());
+        } catch (Throwable t) {
+          exception.compareAndSet(null, t);
+        }
+
+        numCalls++;
+      }
+    }
+
+    void stopRunning() {
+      running.set(false);
+    }
+
+    void rethrowException() throws Throwable {
+      if (exception.get() != null) {
+        throw exception.get();
+      }
+    }
+  }
+
+  @Test (timeout = 900000)
+  public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
+    for (int i = 0; i < numIterations; i++) {
+      TimeoutThread.runWithTimeout(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          try {
+            testRpcWithChaosMonkey(true);
+          } catch (Throwable e) {
+            if (e instanceof Exception) {
+              throw (Exception)e;
+            } else {
+              throw new Exception(e);
+            }
+          }
+          return null;
+        }
+      }, 90000);
+    }
+  }
+
+  @Test (timeout = 900000)
+  @Ignore // TODO: test fails with async client
+  public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
+    for (int i = 0; i < numIterations; i++) {
+      TimeoutThread.runWithTimeout(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          try {
+            testRpcWithChaosMonkey(false);
+          } catch (Throwable e) {
+            if (e instanceof Exception) {
+              throw (Exception)e;
+            } else {
+              throw new Exception(e);
+            }
+          }
+          return null;
+        }
+      }, 90000);
+    }
+  }
+
+  static class TimeoutThread extends Thread {
+    long timeout;
+    public TimeoutThread(long timeout) {
+      this.timeout = timeout;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(timeout);
+        Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
+        System.exit(1); // a timeout happened
+      } catch (InterruptedException e) {
+        // this is what we want
+      }
+    }
+
+    // runs in the same thread context but injects a timeout thread which will exit the JVM on
+    // timeout
+    static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
+      TimeoutThread thread = new TimeoutThread(timeout);
+      thread.start();
+      callable.call();
+      thread.interrupt();
+    }
+  }
+
+  public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(10, 100);
+    for (int i = 0; i < 10; i++) {
+      cluster.startServer();
+    }
+
+    ArrayList<SimpleClient> clients = new ArrayList<>();
+
+    // all threads should share the same rpc client
+    AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
+
+    for (int i = 0; i < 30; i++) {
+      String clientId = "client_" + i + "_";
+      LOG.info("Starting client: " + clientId);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
+      client.start();
+      clients.add(client);
+    }
+
+    LOG.info("Starting MiniChaosMonkey");
+    MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
+    cm.start();
+
+    Threads.sleep(30000);
+
+    LOG.info("Stopping MiniChaosMonkey");
+    cm.stopRunning();
+    cm.join();
+    cm.rethrowException();
+
+    LOG.info("Stopping clients");
+    for (SimpleClient client : clients) {
+      LOG.info("Stopping client: " + client.id);
+      LOG.info(client.id + " numCalls:" + client.numCalls);
+      client.stopRunning();
+      client.join();
+      client.rethrowException();
+      assertTrue(client.numCalls > 10);
+    }
+
+    LOG.info("Stopping RpcClient");
+    rpcClient.close();
+
+    LOG.info("Stopping Cluster");
+    cluster.stopRunning();
+  }
+}