You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/06/07 18:45:16 UTC

[1/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

Repository: hbase
Updated Branches:
  refs/heads/branch-1 3ff082cb8 -> 7fed7a8f4
  refs/heads/branch-1.1 ac0234811 -> 4418ba2f8
  refs/heads/branch-1.2 850744689 -> da52e0cdf
  refs/heads/branch-1.3 f51dfe108 -> 600d10a8b
  refs/heads/master 376ad0d98 -> da88b4824


HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da88b482
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da88b482
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da88b482

Branch: refs/heads/master
Commit: da88b4824054f57fbcbc7795469ab2369a39b5ed
Parents: 376ad0d
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:33:03 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index d8c87e9..dc05af1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1202,9 +1202,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {


[4/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

Posted by en...@apache.org.
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da52e0cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da52e0cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da52e0cd

Branch: refs/heads/branch-1.2
Commit: da52e0cdf109199b157ccaedbd891336221c439b
Parents: 8507446
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:41:13 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/da52e0cd/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index c8baddb..26a71a3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1178,9 +1178,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/da52e0cd/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {


[3/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

Posted by en...@apache.org.
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/600d10a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/600d10a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/600d10a8

Branch: refs/heads/branch-1.3
Commit: 600d10a8b831afe0c881bc109d53f53cee9e3a8a
Parents: f51dfe1
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:39:37 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/600d10a8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 6f68735..68adfba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1181,9 +1181,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/600d10a8/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {


[2/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

Posted by en...@apache.org.
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fed7a8f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fed7a8f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fed7a8f

Branch: refs/heads/branch-1
Commit: 7fed7a8f4eff4e06d73b72c7e9620caadeb94e5c
Parents: 3ff082c
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:38:11 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7fed7a8f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index a0c9dd3..58e577f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1182,9 +1182,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fed7a8f/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {


[5/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in some circumstances

Posted by en...@apache.org.
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <en...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4418ba2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4418ba2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4418ba2f

Branch: refs/heads/branch-1.1
Commit: 4418ba2f8f10ee08d82aee9f311e0e1b6bd7be5e
Parents: ac02348
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:42:54 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  5 ++-
 .../hbase/ipc/IntegrationTestRpcClient.java     | 35 ++++++++++++++++----
 2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4418ba2f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 56ae06f..6d69595 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1146,9 +1146,8 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
     if (connsToClose != null) {
       for (Connection conn : connsToClose) {
-        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-          conn.close();
-        }
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
       }
     }
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/4418ba2f/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 1b425b8..c0b7a43 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
   static class SimpleClient extends Thread {
     AbstractRpcClient rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
+    AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
     Cluster cluster;
     String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
           if (address == null) {
             throw new IOException("Listener channel is closed");
           }
+          sending.set(true);
           ret = (EchoResponseProto)
               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
     void stopRunning() {
       running.set(false);
     }
+    boolean isSending() {
+      return sending.get();
+    }
 
     void rethrowException() throws Throwable {
       if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
     }
   }
 
+  /*
+  Test that not started connections are successfully removed from connection pool when
+  rpc client is closing.
+   */
+  @Test (timeout = 30000)
+  public void testRpcWithWriteThread() throws IOException, InterruptedException {
+    LOG.info("Starting test");
+    Cluster cluster = new Cluster(1, 1);
+    cluster.startServer();
+    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+    for(int i = 0; i <1000; i++) {
+      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+      client.start();
+      while(!client.isSending()) {
+        Thread.sleep(1);
+      }
+      client.stopRunning();
+      rpcClient.close();
+    }
+  }
+
+
   @Test (timeout = 900000)
   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
     for (int i = 0; i < numIterations; i++) {