You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/09/19 10:43:19 UTC
hbase git commit: HBASE-16335 RpcClient under heavy load leaks some
netty bytebuf (Ram)
Repository: hbase
Updated Branches:
refs/heads/master 6eb622545 -> c5b8aabab
HBASE-16335 RpcClient under heavy load leaks some netty bytebuf (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5b8aaba
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5b8aaba
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5b8aaba
Branch: refs/heads/master
Commit: c5b8aababe18f65f5db979128a62d8a0686b9dc5
Parents: 6eb6225
Author: Ramkrishna <ra...@intel.com>
Authored: Mon Sep 19 16:12:15 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Mon Sep 19 16:12:15 2016 +0530
----------------------------------------------------------------------
.../hadoop/hbase/ipc/AbstractRpcClient.java | 4 ++
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 5 +++
.../hadoop/hbase/ipc/NettyRpcConnection.java | 11 +++++
.../apache/hadoop/hbase/ipc/RpcConnection.java | 5 +++
.../hadoop/hbase/security/SaslWrapHandler.java | 43 +++++++++++++-------
5 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 401a240..990ffe0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -215,6 +215,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
LOG.info("Cleanup idle connection to " + conn.remoteId().address);
connections.removeValue(conn.remoteId(), conn);
+ conn.cleanupConnection();
}
}
}
@@ -472,6 +473,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
conn.shutdown();
}
closeInternal();
+ for (T conn : connToClose) {
+ conn.cleanupConnection();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index c8b366d..528b726 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -685,6 +685,11 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
}
@Override
+ public void cleanupConnection() {
+ // do nothing
+ }
+
+ @Override
public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
throws IOException {
pcrc.notifyOnCancel(new RpcCallback<Object>() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 5f22dfd..559b7f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@@ -119,6 +120,16 @@ class NettyRpcConnection extends RpcConnection {
shutdown0();
}
+ @Override
+ public synchronized void cleanupConnection() {
+ if (connectionHeaderPreamble != null) {
+ ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
+ }
+ if (connectionHeaderWithLength != null) {
+ ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
+ }
+ }
+
private void established(Channel ch) {
ch.write(connectionHeaderWithLength.retainedDuplicate());
ChannelPipeline p = ch.pipeline();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 8118b20..5e9e97e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -252,4 +252,9 @@ abstract class RpcConnection {
public abstract void shutdown();
public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException;
+
+ /**
+ * Does the clean up work after the connection is removed from the connection pool
+ */
+ public abstract void cleanupConnection();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
index ddb4ae9..fefb4f8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
+import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import javax.security.sasl.SaslClient;
@@ -60,21 +61,33 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
- if (!queue.isEmpty()) {
- ChannelPromise promise = ctx.newPromise();
- int readableBytes = queue.readableBytes();
- ByteBuf buf = queue.remove(readableBytes, promise);
- byte[] bytes = new byte[readableBytes];
- buf.readBytes(bytes);
- byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
- ChannelPromise lenPromise = ctx.newPromise();
- ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
- ChannelPromise contentPromise = ctx.newPromise();
- ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
- PromiseCombiner combiner = new PromiseCombiner();
- combiner.addAll(lenPromise, contentPromise);
- combiner.finish(promise);
+ ByteBuf buf = null;
+ try {
+ if (!queue.isEmpty()) {
+ ChannelPromise promise = ctx.newPromise();
+ int readableBytes = queue.readableBytes();
+ buf = queue.remove(readableBytes, promise);
+ byte[] bytes = new byte[readableBytes];
+ buf.readBytes(bytes);
+ byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+ ChannelPromise lenPromise = ctx.newPromise();
+ ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
+ ChannelPromise contentPromise = ctx.newPromise();
+ ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+ PromiseCombiner combiner = new PromiseCombiner();
+ combiner.addAll(lenPromise, contentPromise);
+ combiner.finish(promise);
+ }
+ ctx.flush();
+ } finally {
+ if (buf != null) {
+ ReferenceCountUtil.safeRelease(buf);
+ }
}
- ctx.flush();
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ queue.releaseAndFailAll(new Throwable("Closed"));
}
}