You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/06/08 01:59:44 UTC

[hbase] branch branch-2.3 updated: HBASE-24506 async client deadlock (#1858)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new e13fec0  HBASE-24506 async client deadlock (#1858)
e13fec0 is described below

commit e13fec0c92ff17d71ca7272bf824a0d1a1377225
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Jun 8 09:38:58 2020 +0800

    HBASE-24506 async client deadlock (#1858)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  |  36 +++
 .../hadoop/hbase/ipc/NettyRpcConnection.java       | 263 ++++++++++-----------
 .../org/apache/hadoop/hbase/ipc/TestIPCUtil.java   |  51 +++-
 .../hadoop/hbase/ipc/TestNettyRpcConnection.java   |  92 +++++++
 .../client/TestAsyncTableGetMultiThreaded.java     |   5 +-
 5 files changed, 310 insertions(+), 137 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 8e92640..811bb2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -37,10 +38,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -234,4 +239,35 @@ class IPCUtil {
         + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
         + call.timeout));
   }
+
+  private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
+
+    @Override
+    protected MutableInt initialValue() throws Exception {
+      return new MutableInt(0);
+    }
+  };
+
+  @VisibleForTesting
+  static final int MAX_DEPTH = 4;
+
+  static void execute(EventLoop eventLoop, Runnable action) {
+    if (eventLoop.inEventLoop()) {
+      // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
+      // implementation.
+      MutableInt depth = DEPTH.get();
+      if (depth.intValue() < MAX_DEPTH) {
+        depth.increment();
+        try {
+          action.run();
+        } finally {
+          depth.decrement();
+        }
+      } else {
+        eventLoop.execute(action);
+      }
+    } else {
+      eventLoop.execute(action);
+    }
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 7d91fd9..7c9d927 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -19,13 +19,27 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
 import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 
-import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
 import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
+import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
@@ -36,35 +50,25 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
 import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
 
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
-import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * RPC connection implementation based on netty.
- * <p>
+ * <p/>
  * Most operations are executed in handlers. Netty handler is always executed in the same
  * thread(EventLoop) so no lock is needed.
+ * <p/>
+ * <strong>Implementation assumptions:</strong> All the private methods should be called in the
+ * {@link #eventLoop} thread, otherwise there will be races.
  * @since 2.0.0
  */
 @InterfaceAudience.Private
@@ -73,25 +77,30 @@ class NettyRpcConnection extends RpcConnection {
   private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
 
   private static final ScheduledExecutorService RELOGIN_EXECUTOR =
-      Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+    Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
 
   private final NettyRpcClient rpcClient;
 
+  // the event loop used to set up the connection, we will also execute other operations for this
+  // connection in this event loop, to avoid locking everywhere.
+  private final EventLoop eventLoop;
+
   private ByteBuf connectionHeaderPreamble;
 
   private ByteBuf connectionHeaderWithLength;
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
-      justification = "connect is also under lock as notifyOnCancel will call our action directly")
-  private Channel channel;
+  // make it volatile so in the isActive method below we do not need to switch to the event loop
+  // thread to access this field.
+  private volatile Channel channel;
 
   NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
     super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
-        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
+      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
     this.rpcClient = rpcClient;
+    this.eventLoop = rpcClient.group.next();
     byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
     this.connectionHeaderPreamble =
-        Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
+      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
     ConnectionHeader header = getConnectionHeader();
     this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
     this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@@ -99,18 +108,21 @@ class NettyRpcConnection extends RpcConnection {
   }
 
   @Override
-  protected synchronized void callTimeout(Call call) {
-    if (channel != null) {
-      channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
-    }
+  protected void callTimeout(Call call) {
+    execute(eventLoop, () -> {
+      if (channel != null) {
+        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
+      }
+    });
   }
 
   @Override
-  public synchronized boolean isActive() {
+  public boolean isActive() {
     return channel != null;
   }
 
   private void shutdown0() {
+    assert eventLoop.inEventLoop();
     if (channel != null) {
       channel.close();
       channel = null;
@@ -118,21 +130,26 @@ class NettyRpcConnection extends RpcConnection {
   }
 
   @Override
-  public synchronized void shutdown() {
-    shutdown0();
+  public void shutdown() {
+    execute(eventLoop, this::shutdown0);
   }
 
   @Override
-  public synchronized void cleanupConnection() {
-    if (connectionHeaderPreamble != null) {
-      ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
-    }
-    if (connectionHeaderWithLength != null) {
-      ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
-    }
+  public void cleanupConnection() {
+    execute(eventLoop, () -> {
+      if (connectionHeaderPreamble != null) {
+        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
+        connectionHeaderPreamble = null;
+      }
+      if (connectionHeaderWithLength != null) {
+        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
+        connectionHeaderWithLength = null;
+      }
+    });
   }
 
   private void established(Channel ch) throws IOException {
+    assert eventLoop.inEventLoop();
     ChannelPipeline p = ch.pipeline();
     String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
     p.addBefore(addBeforeHandler, null,
@@ -146,43 +163,39 @@ class NettyRpcConnection extends RpcConnection {
   private boolean reloginInProgress;
 
   private void scheduleRelogin(Throwable error) {
+    assert eventLoop.inEventLoop();
     if (error instanceof FallbackDisallowedException) {
       return;
     }
-    synchronized (this) {
-      if (reloginInProgress) {
-        return;
-      }
-      reloginInProgress = true;
-      RELOGIN_EXECUTOR.schedule(new Runnable() {
-
-        @Override
-        public void run() {
-          try {
-            if (provider.canRetry()) {
-              provider.relogin();
-            }
-          } catch (IOException e) {
-            LOG.warn("Relogin failed", e);
-          }
-          synchronized (this) {
-            reloginInProgress = false;
-          }
-        }
-      }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
+    if (!provider.canRetry()) {
+      LOG.trace("SASL Provider does not support retries");
+      return;
+    }
+    if (reloginInProgress) {
+      return;
     }
+    reloginInProgress = true;
+    RELOGIN_EXECUTOR.schedule(() -> {
+      try {
+        provider.relogin();
+      } catch (IOException e) {
+        LOG.warn("Relogin failed", e);
+      }
+      eventLoop.execute(() -> {
+        reloginInProgress = false;
+      });
+    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
   }
 
   private void failInit(Channel ch, IOException e) {
-    synchronized (this) {
-      // fail all pending calls
-      ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
-      shutdown0();
-      return;
-    }
+    assert eventLoop.inEventLoop();
+    // fail all pending calls
+    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
+    shutdown0();
   }
 
   private void saslNegotiate(final Channel ch) {
+    assert eventLoop.inEventLoop();
     UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
     if (ticket == null) {
       failInit(ch, new FatalConnectionException("ticket/user is null"));
@@ -192,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
     final NettyHBaseSaslRpcClientHandler saslHandler;
     try {
       saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
-          serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
+        serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
     } catch (IOException e) {
       failInit(ch, e);
       return;
@@ -212,7 +225,7 @@ class NettyRpcConnection extends RpcConnection {
             Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
             // create the handler to handle the connection header
             ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
-                connectionHeaderPromise, conf, connectionHeaderWithLength);
+              connectionHeaderPromise, conf, connectionHeaderWithLength);
 
             // add ReadTimeoutHandler to deal with server doesn't response connection header
             // because of the different configuration in client side and server side
@@ -251,52 +264,38 @@ class NettyRpcConnection extends RpcConnection {
   }
 
   private void connect() {
+    assert eventLoop.inEventLoop();
     LOG.trace("Connecting to {}", remoteId.address);
 
-    this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
-        .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
-        .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
-        .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
-        .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
-
-          @Override
-          public void operationComplete(ChannelFuture future) throws Exception {
-            Channel ch = future.channel();
-            if (!future.isSuccess()) {
-              failInit(ch, toIOE(future.cause()));
-              rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
-              return;
-            }
-            ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
-            if (useSasl) {
-              saslNegotiate(ch);
-            } else {
-              // send the connection header to server
-              ch.write(connectionHeaderWithLength.retainedDuplicate());
-              established(ch);
-            }
-          }
-        }).channel();
-  }
-
-  private void write(Channel ch, final Call call) {
-    ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
+      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
+      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
+      .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
+      .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
 
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        // Fail the call if we failed to write it out. This usually because the channel is
-        // closed. This is needed because we may shutdown the channel inside event loop and
-        // there may still be some pending calls in the event loop queue after us.
-        if (!future.isSuccess()) {
-          call.setException(toIOE(future.cause()));
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          Channel ch = future.channel();
+          if (!future.isSuccess()) {
+            failInit(ch, toIOE(future.cause()));
+            rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
+            return;
+          }
+          ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
+          if (useSasl) {
+            saslNegotiate(ch);
+          } else {
+            // send the connection header to server
+            ch.write(connectionHeaderWithLength.retainedDuplicate());
+            established(ch);
+          }
         }
-      }
-    });
+      }).channel();
   }
 
-  @Override
-  public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
+  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
+    assert eventLoop.inEventLoop();
     if (reloginInProgress) {
       throw new IOException("Can not send request because relogin is in progress.");
     }
@@ -305,10 +304,8 @@ class NettyRpcConnection extends RpcConnection {
       @Override
       public void run(Object parameter) {
         setCancelled(call);
-        synchronized (this) {
-          if (channel != null) {
-            channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
-          }
+        if (channel != null) {
+          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
         }
       }
     }, new CancellationCallback() {
@@ -322,31 +319,31 @@ class NettyRpcConnection extends RpcConnection {
             connect();
           }
           scheduleTimeoutTask(call);
-          final Channel ch = channel;
-          // We must move the whole writeAndFlush call inside event loop otherwise there will be a
-          // race condition.
-          // In netty's DefaultChannelPipeline, it will find the first outbound handler in the
-          // current thread and then schedule a task to event loop which will start the process from
-          // that outbound handler. It is possible that the first handler is
-          // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
-          // up at the same time so in the event loop thread we remove the
-          // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
-          // write method of BufferCallBeforeInitHandler.
-          // This may be considered as a bug of netty, but anyway there is a work around so let's
-          // fix it by ourselves first.
-          if (ch.eventLoop().inEventLoop()) {
-            write(ch, call);
-          } else {
-            ch.eventLoop().execute(new Runnable() {
-
-              @Override
-              public void run() {
-                write(ch, call);
+          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+              // Fail the call if we failed to write it out. This usually because the channel is
+              // closed. This is needed because we may shutdown the channel inside event loop and
+              // there may still be some pending calls in the event loop queue after us.
+              if (!future.isSuccess()) {
+                call.setException(toIOE(future.cause()));
               }
-            });
-          }
+            }
+          });
         }
       }
     });
   }
+
+  @Override
+  public void sendRequest(final Call call, HBaseRpcController hrc) {
+    execute(eventLoop, () -> {
+      try {
+        sendRequest0(call, hrc);
+      } catch (Exception e) {
+        call.setException(toIOE(e));
+      }
+    });
+  }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index ce0d398..9e1ab2e 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -18,23 +18,30 @@
 package org.apache.hadoop.hbase.ipc;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.io.netty.channel.DefaultEventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+
 @Category({ ClientTests.class, SmallTests.class })
 public class TestIPCUtil {
 
@@ -43,7 +50,7 @@ public class TestIPCUtil {
     HBaseClassTestRule.forClass(TestIPCUtil.class);
 
   private static Throwable create(Class<? extends Throwable> clazz) throws InstantiationException,
-      IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+    IllegalAccessException, InvocationTargetException, NoSuchMethodException {
     try {
       Constructor<? extends Throwable> c = clazz.getDeclaredConstructor();
       c.setAccessible(true);
@@ -102,4 +109,44 @@ public class TestIPCUtil {
       }
     }
   }
+
+  @Test
+  public void testExecute() throws IOException {
+    EventLoop eventLoop = new DefaultEventLoop();
+    MutableInt executed = new MutableInt(0);
+    MutableInt numStackTraceElements = new MutableInt(0);
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    try {
+      IPCUtil.execute(eventLoop, new Runnable() {
+
+        @Override
+        public void run() {
+          int numElements = new Exception().getStackTrace().length;
+          int depth = executed.getAndIncrement();
+          if (depth <= IPCUtil.MAX_DEPTH) {
+            if (numElements <= numStackTraceElements.intValue()) {
+              future.completeExceptionally(
+                new AssertionError("should call run directly but stack trace decreased from " +
+                  numStackTraceElements.intValue() + " to " + numElements));
+              return;
+            }
+            numStackTraceElements.setValue(numElements);
+            IPCUtil.execute(eventLoop, this);
+          } else {
+            if (numElements >= numStackTraceElements.intValue()) {
+              future.completeExceptionally(
+                new AssertionError("should call eventLoop.execute to prevent stack overflow but" +
+                  " stack trace increased from " + numStackTraceElements.intValue() + " to " +
+                  numElements));
+            } else {
+              future.complete(null);
+            }
+          }
+        }
+      });
+      FutureUtils.get(future);
+    } finally {
+      eventLoop.shutdownGracefully();
+    }
+  }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java
new file mode 100644
index 0000000..ab75d60
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcConnection.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.net.InetSocketAddress;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestNettyRpcConnection {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestNettyRpcConnection.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestNettyRpcConnection.class);
+
+  private static NettyRpcClient CLIENT;
+
+  private static NettyRpcConnection CONN;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    CLIENT = new NettyRpcClient(HBaseConfiguration.create());
+    CONN = new NettyRpcConnection(CLIENT,
+      new ConnectionId(User.getCurrent(), "test", new InetSocketAddress("localhost", 1234)));
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    Closeables.close(CLIENT, true);
+  }
+
+  @Test
+  public void testPrivateMethodExecutedInEventLoop() throws IllegalAccessException {
+    // make sure the test is executed with "-ea"
+    assertThrows(AssertionError.class, () -> {
+      assert false;
+    });
+    for (Method method : NettyRpcConnection.class.getDeclaredMethods()) {
+      if (Modifier.isPrivate(method.getModifiers()) && !method.getName().contains("$")) {
+        LOG.info("checking {}", method);
+        method.setAccessible(true);
+        // all private methods should be called inside the event loop thread, so calling it from
+        // this thread will cause the "assert eventLoop.inEventLoop();" to fail
+        try {
+          // now there is no primitive parameters for the private methods so let's pass null
+          method.invoke(CONN, new Object[method.getParameterCount()]);
+          fail("should fail with AssertionError");
+        } catch (InvocationTargetException e) {
+          assertThat(e.getCause(), instanceOf(AssertionError.class));
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index ce6bc05..94adce7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
@@ -58,6 +57,8 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 /**
  * Will split the table, and move region randomly when testing.
  */
@@ -116,7 +117,7 @@ public class TestAsyncTableGetMultiThreaded {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(CONN);
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }