You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/06/07 18:45:17 UTC
[2/5] hbase git commit: HBASE-15957 RpcClientImpl.close never ends in
some circumstances
HBASE-15957 RpcClientImpl.close never ends in some circumstances
Signed-off-by: Enis Soztutar <en...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fed7a8f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fed7a8f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fed7a8f
Branch: refs/heads/branch-1
Commit: 7fed7a8f4eff4e06d73b72c7e9620caadeb94e5c
Parents: 3ff082c
Author: Sergey Soldatov <ss...@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Jun 7 11:38:11 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 5 ++-
.../hbase/ipc/IntegrationTestRpcClient.java | 35 ++++++++++++++++----
2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7fed7a8f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index a0c9dd3..58e577f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1182,9 +1182,8 @@ public class RpcClientImpl extends AbstractRpcClient {
}
if (connsToClose != null) {
for (Connection conn : connsToClose) {
- if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
- conn.close();
- }
+ conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+ conn.close();
}
}
// wait until all connections are closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/7fed7a8f/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index c28f3e6..6c0fbcc 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
static class SimpleClient extends Thread {
AbstractRpcClient rpcClient;
AtomicBoolean running = new AtomicBoolean(true);
+ AtomicBoolean sending = new AtomicBoolean(false);
AtomicReference<Throwable> exception = new AtomicReference<>(null);
Cluster cluster;
String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
if (address == null) {
throw new IOException("Listener channel is closed");
}
+ sending.set(true);
ret = (EchoResponseProto)
rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
void stopRunning() {
running.set(false);
}
+ boolean isSending() {
+ return sending.get();
+ }
void rethrowException() throws Throwable {
if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
}
}
+ /*
+ Test that not started connections are successfully removed from connection pool when
+ rpc client is closing.
+ */
+ @Test (timeout = 30000)
+ public void testRpcWithWriteThread() throws IOException, InterruptedException {
+ LOG.info("Starting test");
+ Cluster cluster = new Cluster(1, 1);
+ cluster.startServer();
+ conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+ for(int i = 0; i <1000; i++) {
+ AbstractRpcClient rpcClient = createRpcClient(conf, true);
+ SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+ client.start();
+ while(!client.isSending()) {
+ Thread.sleep(1);
+ }
+ client.stopRunning();
+ rpcClient.close();
+ }
+ }
+
+
@Test (timeout = 900000)
public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
for (int i = 0; i < numIterations; i++) {