You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/06/10 11:14:02 UTC
hbase git commit: HBASE-18199 Race in NettyRpcConnection may cause
call stuck in BufferCallBeforeInitHandler forever
Repository: hbase
Updated Branches:
refs/heads/master eb2dc5d2a -> ea64dbef7
HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ea64dbef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ea64dbef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ea64dbef
Branch: refs/heads/master
Commit: ea64dbef7f5239ab2162d0bd3dccded60e20ecda
Parents: eb2dc5d
Author: zhangduo <zh...@apache.org>
Authored: Sat Jun 10 19:11:46 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jun 10 19:12:06 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/ipc/NettyRpcConnection.java | 63 ++++++++++++++------
1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ea64dbef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 47d7234..204b812 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection {
private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
- private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
- .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+ private static final ScheduledExecutorService RELOGIN_EXECUTOR =
+ Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
private final NettyRpcClient rpcClient;
@@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection {
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
this.rpcClient = rpcClient;
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
- this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length)
- .writeBytes(connectionHeaderPreamble);
+ this.connectionHeaderPreamble =
+ Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
ConnectionHeader header = getConnectionHeader();
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection {
// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
- p.addFirst(new ReadTimeoutHandler(
- RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
+ p.addFirst(
+ new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
@@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection {
}).channel();
}
+ private void write(Channel ch, final Call call) {
+ ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // Fail the call if we failed to write it out. This usually because the channel is
+ // closed. This is needed because we may shutdown the channel inside event loop and
+ // there may still be some pending calls in the event loop queue after us.
+ if (!future.isSuccess()) {
+ call.setException(toIOE(future.cause()));
+ }
+ }
+ });
+ }
+
@Override
- public synchronized void sendRequest(final Call call, HBaseRpcController hrc)
- throws IOException {
+ public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
if (reloginInProgress) {
throw new IOException("Can not send request because relogin is in progress.");
}
@@ -309,18 +323,29 @@ class NettyRpcConnection extends RpcConnection {
connect();
}
scheduleTimeoutTask(call);
- channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- // Fail the call if we failed to write it out. This usually because the channel is
- // closed. This is needed because we may shutdown the channel inside event loop and
- // there may still be some pending calls in the event loop queue after us.
- if (!future.isSuccess()) {
- call.setException(toIOE(future.cause()));
+ final Channel ch = channel;
+ // We must move the whole writeAndFlush call inside event loop otherwise there will be a
+ // race condition.
+ // In netty's DefaultChannelPipeline, it will find the first outbound handler in the
+ // current thread and then schedule a task to event loop which will start the process from
+ // that outbound handler. It is possible that the first handler is
+ // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
+ // up at the same time so in the event loop thread we remove the
+ // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
+ // write method of BufferCallBeforeInitHandler.
+ // This may be considered as a bug of netty, but anyway there is a work around so let's
+ // fix it by ourselves first.
+ if (ch.eventLoop().inEventLoop()) {
+ write(ch, call);
+ } else {
+ ch.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ write(ch, call);
}
- }
- });
+ });
+ }
}
}
});