You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/06/07 18:45:18 UTC

[3/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/600d10a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/600d10a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/600d10a8

Branch: refs/heads/branch-1.3
Commit: 600d10a8b831afe0c881bc109d53f53cee9e3a8a
Parents: f51dfe1
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:39:37 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/600d10a8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 6f68735..68adfba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1181,9 +1181,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/600d10a8/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {