You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/06/10 11:14:02 UTC

hbase git commit: HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever

Repository: hbase
Updated Branches:
  refs/heads/master eb2dc5d2a -> ea64dbef7


HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ea64dbef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ea64dbef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ea64dbef

Branch: refs/heads/master
Commit: ea64dbef7f5239ab2162d0bd3dccded60e20ecda
Parents: eb2dc5d
Author: zhangduo <zh...@apache.org>
Authored: Sat Jun 10 19:11:46 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jun 10 19:12:06 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/NettyRpcConnection.java    | 63 ++++++++++++++------
 1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ea64dbef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 47d7234..204b812 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection {
 
   private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
 
-  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
-      .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+  private static final ScheduledExecutorService RELOGIN_EXECUTOR =
+      Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
 
   private final NettyRpcClient rpcClient;
 
@@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection {
         rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
     this.rpcClient = rpcClient;
     byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
-    this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length)
-        .writeBytes(connectionHeaderPreamble);
+    this.connectionHeaderPreamble =
+        Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
     ConnectionHeader header = getConnectionHeader();
     this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
     this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection {
 
             // add ReadTimeoutHandler to deal with server doesn't response connection header
             // because of the different configuration in client side and server side
-            p.addFirst(new ReadTimeoutHandler(
-                RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
+            p.addFirst(
+              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
             p.addLast(chHandler);
             connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
               @Override
@@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection {
         }).channel();
   }
 
+  private void write(Channel ch, final Call call) {
+    ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        // Fail the call if we failed to write it out. This usually because the channel is
+        // closed. This is needed because we may shutdown the channel inside event loop and
+        // there may still be some pending calls in the event loop queue after us.
+        if (!future.isSuccess()) {
+          call.setException(toIOE(future.cause()));
+        }
+      }
+    });
+  }
+
   @Override
-  public synchronized void sendRequest(final Call call, HBaseRpcController hrc)
-      throws IOException {
+  public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
     if (reloginInProgress) {
       throw new IOException("Can not send request because relogin is in progress.");
     }
@@ -309,18 +323,29 @@ class NettyRpcConnection extends RpcConnection {
             connect();
           }
           scheduleTimeoutTask(call);
-          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-              // Fail the call if we failed to write it out. This usually because the channel is
-              // closed. This is needed because we may shutdown the channel inside event loop and
-              // there may still be some pending calls in the event loop queue after us.
-              if (!future.isSuccess()) {
-                call.setException(toIOE(future.cause()));
+          final Channel ch = channel;
+          // We must move the whole writeAndFlush call inside event loop otherwise there will be a
+          // race condition.
+          // In netty's DefaultChannelPipeline, it will find the first outbound handler in the
+          // current thread and then schedule a task to event loop which will start the process from
+          // that outbound handler. It is possible that the first handler is
+          // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
+          // up at the same time so in the event loop thread we remove the
+          // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
+          // write method of BufferCallBeforeInitHandler.
+          // This may be considered as a bug of netty, but anyway there is a work around so let's
+          // fix it by ourselves first.
+          if (ch.eventLoop().inEventLoop()) {
+            write(ch, call);
+          } else {
+            ch.eventLoop().execute(new Runnable() {
+
+              @Override
+              public void run() {
+                write(ch, call);
               }
-            }
-          });
+            });
+          }
         }
       }
     });