You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/10/23 20:02:07 UTC
hbase git commit: HBASE-14535 Integration test for rpc connection
concurrency / deadlock testing
Repository: hbase
Updated Branches:
refs/heads/master eb4f9b8b3 -> b4ba615c7
HBASE-14535 Integration test for rpc connection concurrency / deadlock testing
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b4ba615c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b4ba615c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b4ba615c
Branch: refs/heads/master
Commit: b4ba615c70e89aa00ec0878d64e6e0f42e437df0
Parents: eb4f9b8
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Oct 22 18:35:34 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Oct 22 18:35:34 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 454 +++++++++++++++++++
1 file changed, 454 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ba615c/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
new file mode 100644
index 0000000..09de871
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+@Category(IntegrationTests.class)
+public class IntegrationTestRpcClient {
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
+
+ private final Configuration conf;
+
+ private int numIterations = 10;
+
+ public IntegrationTestRpcClient() {
+ conf = HBaseConfiguration.create();
+ }
+
+ static class TestRpcServer extends RpcServer {
+
+ TestRpcServer(Configuration conf) throws IOException {
+ this(new FifoRpcScheduler(conf, 1), conf);
+ }
+
+ TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
+ super(null, "testRpcServer", Lists
+ .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
+ "localhost", 0), conf, scheduler);
+ }
+
+ @Override
+ public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+ throws IOException {
+ return super.call(service, md, param, cellScanner, receiveTime, status);
+ }
+ }
+
+ static final BlockingService SERVICE =
+ TestRpcServiceProtos.TestProtobufRpcProto
+ .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
+
+ @Override
+ public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
+ throws ServiceException {
+ return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+ }
+ });
+
+ protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
+ return isSyncClient ?
+ new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
+ new AsyncRpcClient(conf) {
+ @Override
+ Codec getCodec() {
+ return null;
+ }
+ };
+ }
+
+ static String BIG_PAYLOAD;
+
+ static {
+ StringBuilder builder = new StringBuilder();
+
+ while (builder.length() < 1024 * 1024) { // 2 MB
+ builder.append("big.payload.");
+ }
+
+ BIG_PAYLOAD = builder.toString();
+ }
+
+ class Cluster {
+ Random random = new Random();
+ ReadWriteLock lock = new ReentrantReadWriteLock();
+ HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
+ List<TestRpcServer> serverList = new ArrayList<>();
+ int maxServers;
+ int minServers;
+
+ Cluster(int minServers, int maxServers) {
+ this.minServers = minServers;
+ this.maxServers = maxServers;
+ }
+
+ TestRpcServer startServer() throws IOException {
+ lock.writeLock().lock();
+ try {
+ if (rpcServers.size() >= maxServers) {
+ return null;
+ }
+
+ TestRpcServer rpcServer = new TestRpcServer(conf);
+ rpcServer.start();
+ rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ serverList.add(rpcServer);
+ LOG.info("Started server: " + rpcServer.getListenerAddress());
+ return rpcServer;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ void stopRandomServer() throws Exception {
+ lock.writeLock().lock();
+ TestRpcServer rpcServer = null;
+ try {
+ if (rpcServers.size() <= minServers) {
+ return;
+ }
+ int size = rpcServers.size();
+ int rand = random.nextInt(size);
+ rpcServer = serverList.remove(rand);
+ rpcServers.remove(rpcServer.getListenerAddress());
+
+ if (rpcServer != null) {
+ stopServer(rpcServer);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ void stopServer(TestRpcServer rpcServer) throws InterruptedException {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ LOG.info("Stopping server: " + address);
+ rpcServer.stop();
+ rpcServer.join();
+ LOG.info("Stopped server: " + address);
+ }
+
+ void stopRunning() throws InterruptedException {
+ lock.writeLock().lock();
+ try {
+ for (TestRpcServer rpcServer : serverList) {
+ stopServer(rpcServer);
+ }
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ TestRpcServer getRandomServer() {
+ lock.readLock().lock();
+ try {
+ int size = rpcServers.size();
+ int rand = random.nextInt(size);
+ return serverList.get(rand);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ }
+
+ static class MiniChaosMonkey extends Thread {
+ AtomicBoolean running = new AtomicBoolean(true);
+ Random random = new Random();
+ AtomicReference<Exception> exception = new AtomicReference<>(null);
+ Cluster cluster;
+
+ public MiniChaosMonkey(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void run() {
+ while (running.get()) {
+ switch (random.nextInt() % 2) {
+ case 0: //start a server
+ try {
+ cluster.startServer();
+ } catch (Exception e) {
+ LOG.warn(e);
+ exception.compareAndSet(null, e);
+ }
+ break;
+
+ case 1: // stop a server
+ try {
+ cluster.stopRandomServer();
+ } catch (Exception e) {
+ LOG.warn(e);
+ exception.compareAndSet(null, e);
+ }
+ default:
+ }
+
+ Threads.sleep(100);
+ }
+ }
+
+ void stopRunning() {
+ running.set(false);
+ }
+
+ void rethrowException() throws Exception {
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ }
+ }
+
+ static class SimpleClient extends Thread {
+ AbstractRpcClient rpcClient;
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicReference<Throwable> exception = new AtomicReference<>(null);
+ Cluster cluster;
+ String id;
+ long numCalls = 0;
+ Random random = new Random();
+
+ public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
+ this.cluster = cluster;
+ this.rpcClient = rpcClient;
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+
+ while (running.get()) {
+ boolean isBigPayload = random.nextBoolean();
+ String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
+ EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+ EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
+
+ TestRpcServer server = cluster.getRandomServer();
+ try {
+ User user = User.getCurrent();
+ ret = (EchoResponseProto)
+ rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ } catch (Exception e) {
+ LOG.warn(e);
+ continue; // expected in case connection is closing or closed
+ }
+
+ try {
+ assertNotNull(ret);
+ assertEquals(message, ret.getMessage());
+ } catch (Throwable t) {
+ exception.compareAndSet(null, t);
+ }
+
+ numCalls++;
+ }
+ }
+
+ void stopRunning() {
+ running.set(false);
+ }
+
+ void rethrowException() throws Throwable {
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ }
+ }
+
+ @Test (timeout = 900000)
+ public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
+ for (int i = 0; i < numIterations; i++) {
+ TimeoutThread.runWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ testRpcWithChaosMonkey(true);
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ } else {
+ throw new Exception(e);
+ }
+ }
+ return null;
+ }
+ }, 90000);
+ }
+ }
+
+ @Test (timeout = 900000)
+ @Ignore // TODO: test fails with async client
+ public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
+ for (int i = 0; i < numIterations; i++) {
+ TimeoutThread.runWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ testRpcWithChaosMonkey(false);
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ } else {
+ throw new Exception(e);
+ }
+ }
+ return null;
+ }
+ }, 90000);
+ }
+ }
+
+ static class TimeoutThread extends Thread {
+ long timeout;
+ public TimeoutThread(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
+ System.exit(1); // a timeout happened
+ } catch (InterruptedException e) {
+ // this is what we want
+ }
+ }
+
+ // runs in the same thread context but injects a timeout thread which will exit the JVM on
+ // timeout
+ static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
+ TimeoutThread thread = new TimeoutThread(timeout);
+ thread.start();
+ callable.call();
+ thread.interrupt();
+ }
+ }
+
+ public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
+ LOG.info("Starting test");
+ Cluster cluster = new Cluster(10, 100);
+ for (int i = 0; i < 10; i++) {
+ cluster.startServer();
+ }
+
+ ArrayList<SimpleClient> clients = new ArrayList<>();
+
+ // all threads should share the same rpc client
+ AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
+
+ for (int i = 0; i < 30; i++) {
+ String clientId = "client_" + i + "_";
+ LOG.info("Starting client: " + clientId);
+ SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
+ client.start();
+ clients.add(client);
+ }
+
+ LOG.info("Starting MiniChaosMonkey");
+ MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
+ cm.start();
+
+ Threads.sleep(30000);
+
+ LOG.info("Stopping MiniChaosMonkey");
+ cm.stopRunning();
+ cm.join();
+ cm.rethrowException();
+
+ LOG.info("Stopping clients");
+ for (SimpleClient client : clients) {
+ LOG.info("Stopping client: " + client.id);
+ LOG.info(client.id + " numCalls:" + client.numCalls);
+ client.stopRunning();
+ client.join();
+ client.rethrowException();
+ assertTrue(client.numCalls > 10);
+ }
+
+ LOG.info("Stopping RpcClient");
+ rpcClient.close();
+
+ LOG.info("Stopping Cluster");
+ cluster.stopRunning();
+ }
+}