You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/16 15:01:02 UTC

[09/10] hbase git commit: HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 3740b7f..634c101 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -18,62 +18,108 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
+import io.netty.util.HashedWheelTimer;
+
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
 
 /**
  * Provides the basics for a RpcClient implementation like configuration and Logging.
+ * <p>
+ * Locking schema of the current IPC implementation
+ * <ul>
+ * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
+ * connection.</li>
+ * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
+ * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
+ * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
+ * of how to deal with cancel.</li>
+ * <li>For connection implementation, the construction of a connection should be as fast as possible
+ * because the creation is protected under a lock. Connect to remote side when needed. There is no
+ * forced locking schema for a connection implementation.</li>
+ * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
+ * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
+ * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
+ * of the callbacks are free to hold any lock.</li>
+ * </ul>
  */
 @InterfaceAudience.Private
-public abstract class AbstractRpcClient implements RpcClient {
+public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
   // Log level is being changed in tests
   public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
 
+  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
+      Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
+
+  private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
+      .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
+
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+
+  static {
+    TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
+  }
+
+  protected boolean running = true; // if client runs
+
   protected final Configuration conf;
-  protected String clusterId;
+  protected final String clusterId;
   protected final SocketAddress localAddr;
   protected final MetricsConnection metrics;
 
-  protected UserProvider userProvider;
-  protected final IPCUtil ipcUtil;
+  protected final UserProvider userProvider;
+  protected final CellBlockBuilder cellBlockBuilder;
 
   protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
   // time (in ms), it will be closed at any moment.
-  protected final int maxRetries; //the max. no. of retries for socket connections
+  protected final int maxRetries; // the max. no. of retries for socket connections
   protected final long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -81,10 +127,18 @@ public abstract class AbstractRpcClient implements RpcClient {
   protected final CompressionCodec compressor;
   protected final boolean fallbackAllowed;
 
+  protected final FailedServers failedServers;
+
   protected final int connectTO;
   protected final int readTO;
   protected final int writeTO;
 
+  protected final PoolMap<ConnectionId, T> connections;
+
+  private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+  private final ScheduledFuture<?> cleanupIdleConnectionTask;
+
   private int maxConcurrentCallsPerServer;
 
   private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
@@ -97,7 +151,6 @@ public abstract class AbstractRpcClient implements RpcClient {
 
   /**
    * Construct an IPC client for the cluster <code>clusterId</code>
-   *
    * @param conf configuration
    * @param clusterId the cluster id
    * @param localAddr client socket bind address.
@@ -110,17 +163,18 @@ public abstract class AbstractRpcClient implements RpcClient {
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
     this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
     this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
-    this.ipcUtil = new IPCUtil(conf);
+    this.cellBlockBuilder = new CellBlockBuilder(conf);
 
     this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
     this.conf = conf;
     this.codec = getCodec();
     this.compressor = getCompressor(conf);
     this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.failedServers = new FailedServers(conf);
     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
@@ -129,25 +183,47 @@ public abstract class AbstractRpcClient implements RpcClient {
         HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
         HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
 
-    // login the server principal (if using secure Hadoop)
+    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
+
+    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
+
+      @Override
+      public void run() {
+        cleanupIdleConnections();
+      }
+    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
-          ", tcpKeepAlive=" + this.tcpKeepAlive +
-          ", tcpNoDelay=" + this.tcpNoDelay +
-          ", connectTO=" + this.connectTO +
-          ", readTO=" + this.readTO +
-          ", writeTO=" + this.writeTO +
-          ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
-          ", maxRetries=" + this.maxRetries +
-          ", fallbackAllowed=" + this.fallbackAllowed +
-          ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
+          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
+          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
+          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
+          + this.fallbackAllowed + ", bind address="
+          + (this.localAddr != null ? this.localAddr : "null"));
+    }
+  }
+
+  private void cleanupIdleConnections() {
+    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
+    synchronized (connections) {
+      for (T conn : connections.values()) {
+        // remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
+        // connection itself has already shutdown. The latter check is because that we may still
+        // have some pending calls on connection so we should not shutdown the connection outside.
+        // The connection itself will disconnect if there is no pending call for maxIdleTime.
+        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
+          LOG.info("Cleanup idle connection to " + conn.remoteId().address);
+          connections.removeValue(conn.remoteId(), conn);
+          conn.cleanupConnection();
+        }
+      }
     }
   }
 
   @VisibleForTesting
   public static String getDefaultCodec(final Configuration c) {
     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
-    // Configuration will complain -- then no default codec (and we'll pb everything).  Else
+    // Configuration will complain -- then no default codec (and we'll pb everything). Else
     // default is KeyValueCodec
     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
   }
@@ -160,9 +236,11 @@ public abstract class AbstractRpcClient implements RpcClient {
     // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
     // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
     String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
-    if (className == null || className.length() == 0) return null;
+    if (className == null || className.length() == 0) {
+      return null;
+    }
     try {
-      return (Codec)Class.forName(className).newInstance();
+      return (Codec) Class.forName(className).newInstance();
     } catch (Exception e) {
       throw new RuntimeException("Failed getting codec " + className, e);
     }
@@ -173,6 +251,12 @@ public abstract class AbstractRpcClient implements RpcClient {
     return this.codec != null;
   }
 
+  // for writing tests that want to throw exception when connecting.
+  @VisibleForTesting
+  boolean isTcpNoDelay() {
+    return tcpNoDelay;
+  }
+
   /**
    * Encapsulate the ugly casting and RuntimeException conversion in private method.
    * @param conf configuration
@@ -180,142 +264,289 @@ public abstract class AbstractRpcClient implements RpcClient {
    */
   private static CompressionCodec getCompressor(final Configuration conf) {
     String className = conf.get("hbase.client.rpc.compressor", null);
-    if (className == null || className.isEmpty()) return null;
+    if (className == null || className.isEmpty()) {
+      return null;
+    }
     try {
-        return (CompressionCodec)Class.forName(className).newInstance();
+      return (CompressionCodec) Class.forName(className).newInstance();
     } catch (Exception e) {
       throw new RuntimeException("Failed getting compressor " + className, e);
     }
   }
 
   /**
-   * Return the pool type specified in the configuration, which must be set to
-   * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
-   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
-   * otherwise default to the former.
-   *
-   * For applications with many user threads, use a small round-robin pool. For
-   * applications with few user threads, you may want to try using a
-   * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
-   * instances should not exceed the operating system's hard limit on the number of
-   * connections.
-   *
+   * Return the pool type specified in the configuration, which must be set to either
+   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
+   * former. For applications with many user threads, use a small round-robin pool. For applications
+   * with few user threads, you may want to try using a thread-local pool. In any case, the number
+   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
+   * system's hard limit on the number of connections.
    * @param config configuration
    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
    */
-  protected static PoolMap.PoolType getPoolType(Configuration config) {
-    return PoolMap.PoolType
-        .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
-            PoolMap.PoolType.ThreadLocal);
+  private static PoolMap.PoolType getPoolType(Configuration config) {
+    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+      PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
   }
 
   /**
-   * Return the pool size specified in the configuration, which is applicable only if
-   * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
-   *
+   * Return the pool size specified in the configuration, which is applicable only if the pool type
+   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
    * @param config configuration
    * @return the maximum pool size
    */
-  protected static int getPoolSize(Configuration config) {
+  private static int getPoolSize(Configuration config) {
     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
   }
 
+  private int nextCallId() {
+    int id, next;
+    do {
+      id = callIdCnt.get();
+      next = id < Integer.MAX_VALUE ? id + 1 : 0;
+    } while (!callIdCnt.compareAndSet(id, next));
+    return id;
+  }
+
   /**
    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
    * threw an exception.
-   *
    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
-   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
-   *               will be a
-   *               new Connection each time.
+   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
+   *          new Connection each time.
    * @return A pair with the Message response and the Cell data (if any).
    */
-  Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
+  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
       throws ServiceException {
-    if (pcrc == null) {
-      pcrc = new PayloadCarryingRpcController();
+    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
+    callMethod(md, hrc, param, returnType, ticket, isa, done);
+    Message val;
+    try {
+      val = done.get();
+    } catch (IOException e) {
+      throw new ServiceException(e);
     }
+    if (hrc.failed()) {
+      throw new ServiceException(hrc.getFailed());
+    } else {
+      return val;
+    }
+  }
 
-    Pair<Message, CellScanner> val;
-    AtomicInteger counter = concurrentCounterCache.getUnchecked(isa);
-    int count = counter.incrementAndGet();
-    try {
-      if (count > maxConcurrentCallsPerServer) {
-        throw new ServerTooBusyException(isa, count);
+  /**
+   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
+   * given host/port are reused.
+   */
+  private T getConnection(ConnectionId remoteId) throws IOException {
+    if (failedServers.isFailedServer(remoteId.getAddress())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not trying to connect to " + remoteId.address
+            + " this server is in the failed servers list");
       }
-      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-      cs.setStartTime(EnvironmentEdgeManager.currentTime());
-      val = call(pcrc, md, param, returnType, ticket, isa, cs);
-      // Shove the results into controller so can be carried across the proxy/pb service void.
-      pcrc.setCellScanner(val.getSecond());
-
-      cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
-      if (metrics != null) {
-        metrics.updateRpc(md, param, cs);
+      throw new FailedServerException(
+          "This server is in the failed servers list: " + remoteId.address);
+    }
+    T conn;
+    synchronized (connections) {
+      if (!running) {
+        throw new StoppedRpcClientException();
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+      conn = connections.get(remoteId);
+      if (conn == null) {
+        conn = createConnection(remoteId);
+        connections.put(remoteId, conn);
       }
-      return val.getFirst();
-    } catch (Throwable e) {
-      throw new ServiceException(e);
-    } finally {
-      counter.decrementAndGet();
+      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
     }
+    return conn;
   }
 
   /**
-   * Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> protocol,
-   * with the <code>ticket</code> credentials, returning the value.
-   * Throws exceptions if there are network problems or if the remote code
-   * threw an exception.
-   *
-   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
-   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
-   *               will be a
-   *               new Connection each time.
-   * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException
-   * @throws java.io.IOException
+   * Not connected.
    */
-  protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
-      Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
-      InetSocketAddress isa, MetricsConnection.CallStats callStats)
-      throws IOException, InterruptedException;
+  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
+
+  private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
+      RpcCallback<Message> callback) {
+    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
+    if (metrics != null) {
+      metrics.updateRpc(call.md, call.param, call.callStats);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
+    }
+    if (call.error != null) {
+      if (call.error instanceof RemoteException) {
+        call.error.fillInStackTrace();
+        hrc.setFailed(call.error);
+      } else {
+        hrc.setFailed(wrapException(addr, call.error));
+      }
+      callback.run(null);
+    } else {
+      hrc.setDone(call.cells);
+      callback.run(call.response);
+    }
+  }
 
+  private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+      final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
+      final RpcCallback<Message> callback) {
+    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+    cs.setStartTime(EnvironmentEdgeManager.currentTime());
+    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
+    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
+        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
+          @Override
+          public void run(Call call) {
+            counter.decrementAndGet();
+            onCallFinished(call, hrc, addr, callback);
+          }
+        }, cs);
+    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+    int count = counter.incrementAndGet();
+    try {
+      if (count > maxConcurrentCallsPerServer) {
+        throw new ServerTooBusyException(addr, count);
+      }
+      T connection = getConnection(remoteId);
+      connection.sendRequest(call, hrc);
+    } catch (Exception e) {
+      call.setException(toIOE(e));
+    }
+  }
+
+  private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+    InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
+    if (addr.isUnresolved()) {
+      throw new UnknownHostException("can not resolve " + sn.getServerName());
+    }
+    return addr;
+  }
+
+  /**
+   * Interrupt the connections to the given ip:port server. This should be called if the server is
+   * known as actually dead. This will not prevent current operation to be retried, and, depending
+   * on their own behavior, they may retry on the same server. This can be a feature, for example at
+   * startup. In any case, they're likely to get connection refused (if the process died) or no
+   * route to host: i.e. their next retries should be faster and with a safe exception.
+   */
   @Override
-  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
-      int defaultOperationTimeout) throws UnknownHostException {
-    return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
+  public void cancelConnections(ServerName sn) {
+    synchronized (connections) {
+      for (T connection : connections.values()) {
+        ConnectionId remoteId = connection.remoteId();
+        if (remoteId.address.getPort() == sn.getPort() &&
+            remoteId.address.getHostName().equals(sn.getHostname())) {
+          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " +
+              connection.remoteId);
+          connections.removeValue(remoteId, connection);
+          connection.shutdown();
+        }
+      }
+    }
   }
 
   /**
-   * Takes an Exception and the address we were trying to connect to and return an IOException with
-   * the input exception as the cause. The new exception provides the stack trace of the place where
-   * the exception is thrown and some extra diagnostics information. If the exception is
-   * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
-   * an IOException.
-   * @param addr target address
-   * @param exception the relevant exception
-   * @return an exception to throw
+   * Configure an hbase rpccontroller
+   * @param controller to configure
+   * @param channelOperationTimeout timeout for operation
+   * @return configured controller
    */
-  protected IOException wrapException(InetSocketAddress addr, Exception exception) {
-    if (exception instanceof ConnectException) {
-      // connection refused; include the host:port in the error
-      return (ConnectException) new ConnectException("Call to " + addr
-          + " failed on connection exception: " + exception).initCause(exception);
-    } else if (exception instanceof SocketTimeoutException) {
-      return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
-          + " failed because " + exception).initCause(exception);
-    } else if (exception instanceof ConnectionClosingException) {
-      return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
-          + " failed on local exception: " + exception).initCause(exception);
+  static HBaseRpcController configureHBaseRpcController(
+      RpcController controller, int channelOperationTimeout) {
+    HBaseRpcController hrc;
+    if (controller != null && controller instanceof HBaseRpcController) {
+      hrc = (HBaseRpcController) controller;
+      if (!hrc.hasCallTimeout()) {
+        hrc.setCallTimeout(channelOperationTimeout);
+      }
     } else {
-      return (IOException) new IOException("Call to " + addr + " failed on local exception: "
-          + exception).initCause(exception);
+      hrc = new HBaseRpcControllerImpl();
+      hrc.setCallTimeout(channelOperationTimeout);
+    }
+    return hrc;
+  }
+
+  protected abstract void closeInternal();
+
+  @Override
+  public void close() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping rpc client");
+    }
+    Collection<T> connToClose;
+    synchronized (connections) {
+      if (!running) {
+        return;
+      }
+      running = false;
+      connToClose = connections.values();
+      connections.clear();
+    }
+    cleanupIdleConnectionTask.cancel(true);
+    for (T conn : connToClose) {
+      conn.shutdown();
+    }
+    closeInternal();
+    for (T conn : connToClose) {
+      conn.cleanupConnection();
+    }
+  }
+
+  @Override
+  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
+      int rpcTimeout) throws UnknownHostException {
+    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
+  }
+
+  @Override
+  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
+      throws UnknownHostException {
+    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
+  }
+
+  private static class AbstractRpcChannel {
+
+    protected final InetSocketAddress addr;
+
+    protected final AbstractRpcClient<?> rpcClient;
+
+    protected final User ticket;
+
+    protected final int rpcTimeout;
+
+    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+        User ticket, int rpcTimeout) {
+      this.addr = addr;
+      this.rpcClient = rpcClient;
+      this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
+    }
+
+    /**
+     * Configure an rpc controller
+     * @param controller to configure
+     * @return configured rpc controller
+     */
+    protected HBaseRpcController configureRpcController(RpcController controller) {
+      HBaseRpcController hrc;
+      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
+      // side. And now we may use ServerRpcController.
+      if (controller != null && controller instanceof HBaseRpcController) {
+        hrc = (HBaseRpcController) controller;
+        if (!hrc.hasCallTimeout()) {
+          hrc.setCallTimeout(rpcTimeout);
+        }
+      } else {
+        hrc = new HBaseRpcControllerImpl();
+        hrc.setCallTimeout(rpcTimeout);
+      }
+      return hrc;
     }
   }
 
@@ -323,42 +554,42 @@ public abstract class AbstractRpcClient implements RpcClient {
    * Blocking rpc channel that goes via hbase rpc.
    */
   @VisibleForTesting
-  public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
-    private final InetSocketAddress isa;
-    private final AbstractRpcClient rpcClient;
-    private final User ticket;
-    private final int channelOperationTimeout;
+  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
+      implements BlockingRpcChannel {
 
-    /**
-     * @param channelOperationTimeout - the default timeout when no timeout is given
-     */
-    protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
-        final ServerName sn, final User ticket, int channelOperationTimeout)
-        throws UnknownHostException {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      if (this.isa.isUnresolved()) {
-        throw new UnknownHostException(sn.getHostname());
-      }
-      this.rpcClient = rpcClient;
-      this.ticket = ticket;
-      this.channelOperationTimeout = channelOperationTimeout;
+    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
+        InetSocketAddress addr, User ticket, int rpcTimeout) {
+      super(rpcClient, addr, ticket, rpcTimeout);
     }
 
     @Override
     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
         Message param, Message returnType) throws ServiceException {
-      PayloadCarryingRpcController pcrc;
-      if (controller != null && controller instanceof PayloadCarryingRpcController) {
-        pcrc = (PayloadCarryingRpcController) controller;
-        if (!pcrc.hasCallTimeout()) {
-          pcrc.setCallTimeout(channelOperationTimeout);
-        }
-      } else {
-        pcrc = new PayloadCarryingRpcController();
-        pcrc.setCallTimeout(channelOperationTimeout);
-      }
+      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
+        param, returnType, ticket, addr);
+    }
+  }
+
+  /**
+   * Async rpc channel that goes via hbase rpc.
+   */
+  public static class RpcChannelImplementation extends AbstractRpcChannel implements
+      RpcChannel {
 
-      return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
+    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+        User ticket, int rpcTimeout) throws UnknownHostException {
+      super(rpcClient, addr, ticket, rpcTimeout);
+    }
+
+    @Override
+    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
+        Message param, Message returnType, RpcCallback<Message> done) {
+      // This method does not throw any exceptions, so the caller must provide a
+      // HBaseRpcController which is used to pass the exceptions.
+      this.rpcClient.callMethod(md,
+        configureRpcController(Preconditions.checkNotNull(controller,
+          "RpcController can not be null for async rpc call")),
+        param, returnType, ticket, addr, done);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
deleted file mode 100644
index a5da0dc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import io.netty.channel.EventLoop;
-import io.netty.util.concurrent.DefaultPromise;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
-
-import java.io.IOException;
-
-/**
- * Represents an Async Hbase call and its response.
- *
- * Responses are passed on to its given doneHandler and failures to the rpcController
- */
-@InterfaceAudience.Private
-public class AsyncCall extends DefaultPromise<Message> {
-  private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
-
-  final int id;
-
-  final Descriptors.MethodDescriptor method;
-  final Message param;
-  final PayloadCarryingRpcController controller;
-  final Message responseDefaultType;
-  final long startTime;
-  final long rpcTimeout;
-  final MetricsConnection.CallStats callStats;
-
-  /**
-   * Constructor
-   *
-   * @param eventLoop           for call
-   * @param connectId           connection id
-   * @param md                  the method descriptor
-   * @param param               parameters to send to Server
-   * @param controller          controller for response
-   * @param responseDefaultType the default response type
-   */
-  public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
-      param, PayloadCarryingRpcController controller, Message responseDefaultType,
-      MetricsConnection.CallStats callStats) {
-    super(eventLoop);
-
-    this.id = connectId;
-
-    this.method = md;
-    this.param = param;
-    this.controller = controller;
-    this.responseDefaultType = responseDefaultType;
-
-    this.startTime = EnvironmentEdgeManager.currentTime();
-    this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
-    this.callStats = callStats;
-  }
-
-  /**
-   * Get the start time
-   *
-   * @return start time for the call
-   */
-  public long getStartTime() {
-    return this.startTime;
-  }
-
-  @Override
-  public String toString() {
-    return "callId=" + this.id + ", method=" + this.method.getName() +
-      ", rpcTimeout=" + this.rpcTimeout + ", param {" +
-      (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
-  }
-
-  /**
-   * Set success with a cellBlockScanner
-   *
-   * @param value            to set
-   * @param cellBlockScanner to set
-   */
-  public void setSuccess(Message value, CellScanner cellBlockScanner) {
-    if (cellBlockScanner != null) {
-      controller.setCellScanner(cellBlockScanner);
-    }
-
-    if (LOG.isTraceEnabled()) {
-      long callTime = EnvironmentEdgeManager.currentTime() - startTime;
-      LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
-    }
-
-    this.setSuccess(value);
-  }
-
-  /**
-   * Set failed
-   *
-   * @param exception to set
-   */
-  public void setFailed(IOException exception) {
-    if (ExceptionUtil.isInterrupt(exception)) {
-      exception = ExceptionUtil.asInterrupt(exception);
-    }
-    if (exception instanceof RemoteException) {
-      exception = ((RemoteException) exception).unwrapRemoteException();
-    }
-
-    this.setFailure(exception);
-  }
-
-  /**
-   * Get the rpc timeout
-   *
-   * @return current timeout for this call
-   */
-  public long getRpcTimeout() {
-    return rpcTimeout;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
deleted file mode 100644
index 878d8b8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ /dev/null
@@ -1,785 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannel {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
-
-  private static final int MAX_SASL_RETRIES = 5;
-
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
-    = new HashMap<>();
-
-  static {
-    TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
-      new AuthenticationTokenSelector());
-  }
-
-  final AsyncRpcClient client;
-
-  // Contains the channel to work with.
-  // Only exists when connected
-  private Channel channel;
-
-  String name;
-  final User ticket;
-  final String serviceName;
-  final InetSocketAddress address;
-
-  private int ioFailureCounter = 0;
-  private int connectFailureCounter = 0;
-
-  boolean useSasl;
-  AuthMethod authMethod;
-  private int reloginMaxBackoff;
-  private Token<? extends TokenIdentifier> token;
-  private String serverPrincipal;
-
-  // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
-  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
-  private boolean connected = false;
-  private boolean closed = false;
-
-  private Timeout cleanupTimer;
-
-  private final TimerTask timeoutTask = new TimerTask() {
-    @Override
-    public void run(Timeout timeout) throws Exception {
-      cleanupCalls();
-    }
-  };
-
-  /**
-   * Constructor for netty RPC channel
-   * @param bootstrap to construct channel on
-   * @param client to connect with
-   * @param ticket of user which uses connection
-   *               @param serviceName name of service to connect to
-   * @param address to connect to
-   */
-  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
-      String serviceName, InetSocketAddress address) {
-    this.client = client;
-
-    this.ticket = ticket;
-    this.serviceName = serviceName;
-    this.address = address;
-
-    this.channel = connect(bootstrap).channel();
-
-    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
-        + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
-  }
-
-  /**
-   * Connect to channel
-   * @param bootstrap to connect to
-   * @return future of connection
-   */
-  private ChannelFuture connect(final Bootstrap bootstrap) {
-    return bootstrap.remoteAddress(address).connect()
-        .addListener(new GenericFutureListener<ChannelFuture>() {
-          @Override
-          public void operationComplete(final ChannelFuture f) throws Exception {
-            if (!f.isSuccess()) {
-              if (f.cause() instanceof SocketException) {
-                retryOrClose(bootstrap, connectFailureCounter++, f.cause());
-              } else {
-                retryOrClose(bootstrap, ioFailureCounter++, f.cause());
-              }
-              return;
-            }
-            channel = f.channel();
-
-            setupAuthorization();
-
-            ByteBuf b = channel.alloc().directBuffer(6);
-            createPreamble(b, authMethod);
-            channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            if (useSasl) {
-              UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
-              if (authMethod == AuthMethod.KERBEROS) {
-                if (ticket != null && ticket.getRealUser() != null) {
-                  ticket = ticket.getRealUser();
-                }
-              }
-              SaslClientHandler saslHandler;
-              if (ticket == null) {
-                throw new FatalConnectionException("ticket/user is null");
-              }
-              final UserGroupInformation realTicket = ticket;
-              saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
-                @Override
-                public SaslClientHandler run() throws IOException {
-                  return getSaslHandler(realTicket, bootstrap);
-                }
-              });
-              if (saslHandler != null) {
-                // Sasl connect is successful. Let's set up Sasl channel handler
-                channel.pipeline().addFirst(saslHandler);
-              } else {
-                // fall back to simple auth because server told us so.
-                authMethod = AuthMethod.SIMPLE;
-                useSasl = false;
-              }
-            } else {
-              startHBaseConnection(f.channel());
-            }
-          }
-        });
-  }
-
-  /**
-   * Start HBase connection
-   * @param ch channel to start connection on
-   */
-  private void startHBaseConnection(Channel ch) {
-    ch.pipeline().addLast("frameDecoder",
-      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    try {
-      writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            close(future.cause());
-            return;
-          }
-          List<AsyncCall> callsToWrite;
-          synchronized (pendingCalls) {
-            connected = true;
-            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-          }
-          for (AsyncCall call : callsToWrite) {
-            writeRequest(call);
-          }
-        }
-      });
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Start HBase connection with sasl encryption
-   * @param ch channel to start connection on
-   */
-  private void startConnectionWithEncryption(Channel ch) {
-    // for rpc encryption, the order of ChannelInboundHandler should be:
-    // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
-    // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
-    // SaslClientHandler will handler this
-    ch.pipeline().addFirst("beforeUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
-    ch.pipeline().addLast("afterUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    List<AsyncCall> callsToWrite;
-    synchronized (pendingCalls) {
-      connected = true;
-      callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-    }
-    for (AsyncCall call : callsToWrite) {
-      writeRequest(call);
-    }
-  }
-
-  /**
-   * Get SASL handler
-   * @param bootstrap to reconnect to
-   * @return new SASL handler
-   * @throws java.io.IOException if handler failed to create
-   */
-  private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
-      final Bootstrap bootstrap) throws IOException {
-    return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
-        client.fallbackAllowed,
-        client.conf.get("hbase.rpc.protection",
-          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
-        getChannelHeaderBytes(authMethod),
-        new SaslClientHandler.SaslExceptionHandler() {
-          @Override
-          public void handle(int retryCount, Random random, Throwable cause) {
-            try {
-              // Handle Sasl failure. Try to potentially get new credentials
-              handleSaslConnectionFailure(retryCount, cause, realTicket);
-
-              // Try to reconnect
-              client.newTimeout(new TimerTask() {
-                @Override
-                public void run(Timeout timeout) throws Exception {
-                  connect(bootstrap);
-                }
-              }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
-            } catch (IOException | InterruptedException e) {
-              close(e);
-            }
-          }
-        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
-          @Override
-          public void onSuccess(Channel channel) {
-            startHBaseConnection(channel);
-          }
-
-          @Override
-          public void onSaslProtectionSucess(Channel channel) {
-            startConnectionWithEncryption(channel);
-          }
-        });
-  }
-
-  /**
-   * Retry to connect or close
-   *
-   * @param bootstrap      to connect with
-   * @param connectCounter amount of tries
-   * @param e              exception of fail
-   */
-  private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
-    if (connectCounter < client.maxRetries) {
-      client.newTimeout(new TimerTask() {
-        @Override public void run(Timeout timeout) throws Exception {
-          connect(bootstrap);
-        }
-      }, client.failureSleep, TimeUnit.MILLISECONDS);
-    } else {
-      client.failedServers.addToFailedServers(address);
-      close(e);
-    }
-  }
-
-  /**
-   * Calls method on channel
-   * @param method to call
-   * @param controller to run call with
-   * @param request to send
-   * @param responsePrototype to construct response with
-   */
-  public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
-      final PayloadCarryingRpcController controller, final Message request,
-      final Message responsePrototype, MetricsConnection.CallStats callStats) {
-    final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
-        method, request, controller, responsePrototype, callStats);
-    controller.notifyOnCancel(new RpcCallback<Object>() {
-      @Override
-      public void run(Object parameter) {
-        // TODO: do not need to call AsyncCall.setFailed?
-        synchronized (pendingCalls) {
-          pendingCalls.remove(call.id);
-        }
-      }
-    });
-    // TODO: this should be handled by PayloadCarryingRpcController.
-    if (controller.isCanceled()) {
-      // To finish if the call was cancelled before we set the notification (race condition)
-      call.cancel(true);
-      return call;
-    }
-
-    synchronized (pendingCalls) {
-      if (closed) {
-        Promise<Message> promise = channel.eventLoop().newPromise();
-        promise.setFailure(new ConnectException());
-        return promise;
-      }
-      pendingCalls.put(call.id, call);
-      // Add timeout for cleanup if none is present
-      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
-      }
-      if (!connected) {
-        return call;
-      }
-    }
-    writeRequest(call);
-    return call;
-  }
-
-  AsyncCall removePendingCall(int id) {
-    synchronized (pendingCalls) {
-      return pendingCalls.remove(id);
-    }
-  }
-
-  /**
-   * Write the channel header
-   * @param channel to write to
-   * @return future of write
-   * @throws java.io.IOException on failure to write
-   */
-  private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-
-    ByteBuf b = channel.alloc().directBuffer(totalSize);
-
-    b.writeInt(header.getSerializedSize());
-    b.writeBytes(header.toByteArray());
-
-    return channel.writeAndFlush(b);
-  }
-
-  private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
-    b.putInt(header.getSerializedSize());
-    b.put(header.toByteArray());
-    return b.array();
-  }
-
-  private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
-        .setServiceName(serviceName);
-
-    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
-    if (userInfoPB != null) {
-      headerBuilder.setUserInfo(userInfoPB);
-    }
-
-    if (client.codec != null) {
-      headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
-    }
-    if (client.compressor != null) {
-      headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
-    }
-
-    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
-    return headerBuilder.build();
-  }
-
-  /**
-   * Write request to channel
-   * @param call to write
-   */
-  private void writeRequest(final AsyncCall call) {
-    try {
-      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
-          .newBuilder();
-      requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
-          .setRequestParam(call.param != null);
-
-      if (Trace.isTracing()) {
-        Span s = Trace.currentSpan();
-        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
-            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
-      }
-
-      ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
-      if (cellBlock != null) {
-        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
-            .newBuilder();
-        cellBlockBuilder.setLength(cellBlock.limit());
-        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
-      }
-      // Only pass priority if there one. Let zero be same as no priority.
-      if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
-        requestHeaderBuilder.setPriority(call.controller.getPriority());
-      }
-      requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
-          Integer.MAX_VALUE : (int)call.rpcTimeout);
-      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
-      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
-      if (cellBlock != null) {
-        totalSize += cellBlock.remaining();
-      }
-
-      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
-      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
-        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
-      }
-
-      channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Set up server authorization
-   * @throws java.io.IOException if auth setup failed
-   */
-  private void setupAuthorization() throws IOException {
-    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
-    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
-    this.token = null;
-    if (useSasl && securityInfo != null) {
-      AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
-      if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
-        if (tokenSelector != null) {
-          token = tokenSelector.selectToken(new Text(client.clusterId),
-            ticket.getUGI().getTokens());
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("No token selector found for type " + tokenKind);
-        }
-      }
-      String serverKey = securityInfo.getServerPrincipal();
-      if (serverKey == null) {
-        throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
-      }
-      this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
-        address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
-            + serverPrincipal);
-      }
-    }
-
-    if (!useSasl) {
-      authMethod = AuthMethod.SIMPLE;
-    } else if (token != null) {
-      authMethod = AuthMethod.DIGEST;
-    } else {
-      authMethod = AuthMethod.KERBEROS;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
-    }
-    reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
-  }
-
-  /**
-   * Build the user information
-   * @param ugi User Group Information
-   * @param authMethod Authorization method
-   * @return UserInformation protobuf
-   */
-  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
-    if (ugi == null || authMethod == AuthMethod.DIGEST) {
-      // Don't send user for token auth
-      return null;
-    }
-    RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
-    if (authMethod == AuthMethod.KERBEROS) {
-      // Send effective user for Kerberos auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-    } else if (authMethod == AuthMethod.SIMPLE) {
-      // Send both effective user and real user for simple auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-      if (ugi.getRealUser() != null) {
-        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-      }
-    }
-    return userInfoPB.build();
-  }
-
-  /**
-   * Create connection preamble
-   * @param byteBuf to write to
-   * @param authMethod to write
-   */
-  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
-    byteBuf.writeBytes(HConstants.RPC_HEADER);
-    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
-    byteBuf.writeByte(authMethod.code);
-  }
-
-  private void close0(Throwable e) {
-    List<AsyncCall> toCleanup;
-    synchronized (pendingCalls) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
-      pendingCalls.clear();
-    }
-    IOException closeException = null;
-    if (e != null) {
-      if (e instanceof IOException) {
-        closeException = (IOException) e;
-      } else {
-        closeException = new IOException(e);
-      }
-    }
-    // log the info
-    if (LOG.isDebugEnabled() && closeException != null) {
-      LOG.debug(name + ": closing ipc connection to " + address, closeException);
-    }
-    if (cleanupTimer != null) {
-      cleanupTimer.cancel();
-      cleanupTimer = null;
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(closeException != null ? closeException
-          : new ConnectionClosingException(
-              "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
-    }
-    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(name + ": closed");
-    }
-  }
-
-  /**
-   * Close connection
-   * @param e exception on close
-   */
-  public void close(final Throwable e) {
-    client.removeConnection(this);
-
-    // Move closing from the requesting thread to the channel thread
-    if (channel.eventLoop().inEventLoop()) {
-      close0(e);
-    } else {
-      channel.eventLoop().execute(new Runnable() {
-        @Override
-        public void run() {
-          close0(e);
-        }
-      });
-    }
-  }
-
-  /**
-   * Clean up calls.
-   */
-  private void cleanupCalls() {
-    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long nextCleanupTaskDelay = -1L;
-    synchronized (pendingCalls) {
-      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
-        AsyncCall call = iter.next();
-        long timeout = call.getRpcTimeout();
-        if (timeout > 0) {
-          if (currentTime - call.getStartTime() >= timeout) {
-            iter.remove();
-            toCleanup.add(call);
-          } else {
-            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
-              nextCleanupTaskDelay = timeout;
-            }
-          }
-        }
-      }
-      if (nextCleanupTaskDelay > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
-      } else {
-        cleanupTimer = null;
-      }
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
-          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
-    }
-  }
-
-  /**
-   * Check if the connection is alive
-   * @return true if alive
-   */
-  public boolean isAlive() {
-    return channel.isOpen();
-  }
-
-  /**
-   * Check if user should authenticate over Kerberos
-   * @return true if should be authenticated over Kerberos
-   * @throws java.io.IOException on failure of check
-   */
-  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
-      // Make sure user logged in using Kerberos either keytab or TGT
-      loginUser.hasKerberosCredentials() &&
-      // relogin only in case it is the login user (e.g. JT)
-      // or superuser (like oozie).
-      (loginUser.equals(currentUser) || loginUser.equals(realUser));
-  }
-
-  /**
-   * If multiple clients with the same principal try to connect to the same server at the same time,
-   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
-   * work around this, what is done is that the client backs off randomly and tries to initiate the
-   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
-   * attempted.
-   * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
-   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
-   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
-   * underlying authentication implementation, so there is no retry from other high level (for eg,
-   * HCM or HBaseAdmin).
-   * </p>
-   * @param currRetries retry count
-   * @param ex exception describing fail
-   * @param user which is trying to connect
-   * @throws java.io.IOException if IO fail
-   * @throws InterruptedException if thread is interrupted
-   */
-  private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
-      final UserGroupInformation user) throws IOException, InterruptedException {
-    user.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws IOException, InterruptedException {
-        if (shouldAuthenticateOverKrb()) {
-          if (currRetries < MAX_SASL_RETRIES) {
-            LOG.debug("Exception encountered while connecting to the server : " + ex);
-            // try re-login
-            if (UserGroupInformation.isLoginKeytabBased()) {
-              UserGroupInformation.getLoginUser().reloginFromKeytab();
-            } else {
-              UserGroupInformation.getLoginUser().reloginFromTicketCache();
-            }
-
-            // Should reconnect
-            return null;
-          } else {
-            String msg = "Couldn't setup connection for "
-                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
-            LOG.warn(msg);
-            throw (IOException) new IOException(msg).initCause(ex);
-          }
-        } else {
-          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
-        }
-        if (ex instanceof RemoteException) {
-          throw (RemoteException) ex;
-        }
-        if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed."
-              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
-          LOG.fatal(msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-        throw new IOException(ex);
-      }
-    });
-  }
-
-  public int getConnectionHashCode() {
-    return ConnectionId.hashCode(ticket, serviceName, address);
-  }
-
-  @Override
-  public int hashCode() {
-    return getConnectionHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof AsyncRpcChannel) {
-      AsyncRpcChannel channel = (AsyncRpcChannel) obj;
-      return channel.hashCode() == obj.hashCode();
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
-  }
-
-  /**
-   * Listens to call writes and fails if write failed
-   */
-  private static final class CallWriteListener implements ChannelFutureListener {
-    private final AsyncRpcChannel rpcChannel;
-    private final int id;
-
-    public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
-      this.rpcChannel = asyncRpcChannel;
-      this.id = id;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (!future.isSuccess()) {
-        AsyncCall call = rpcChannel.removePendingCall(id);
-        if (call != null) {
-          if (future.cause() instanceof IOException) {
-            call.setFailed((IOException) future.cause());
-          } else {
-            call.setFailed(new IOException(future.cause()));
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
deleted file mode 100644
index e12e298..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.JVM;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PoolMap;
-import org.apache.hadoop.hbase.util.Threads;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-/**
- * Netty client for the requests and responses
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AsyncRpcClient extends AbstractRpcClient {
-
-  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
-  public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
-  public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
-  public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
-
-  private static final HashedWheelTimer WHEEL_TIMER =
-      new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
-          100, TimeUnit.MILLISECONDS);
-
-  private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
-      new ChannelInitializer<SocketChannel>() {
-    @Override
-    protected void initChannel(SocketChannel ch) throws Exception {
-      //empty initializer
-    }
-  };
-
-  protected final AtomicInteger callIdCnt = new AtomicInteger();
-
-  private final PoolMap<Integer, AsyncRpcChannel> connections;
-
-  final FailedServers failedServers;
-
-  @VisibleForTesting
-  final Bootstrap bootstrap;
-
-  private final boolean useGlobalEventLoopGroup;
-
-  @VisibleForTesting
-  static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
-
-  private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
-      getGlobalEventLoopGroup(Configuration conf) {
-    if (GLOBAL_EVENT_LOOP_GROUP == null) {
-      GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create global event loop group "
-            + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
-      }
-    }
-    return GLOBAL_EVENT_LOOP_GROUP;
-  }
-
-  private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
-      Configuration conf) {
-    // Max amount of threads to use. 0 lets Netty decide based on amount of cores
-    int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
-
-    // Config to enable native transport. Does not seem to be stable at time of implementation
-    // although it is not extensively tested.
-    boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
-
-    // Use the faster native epoll transport mechanism on linux if enabled
-    if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
-      }
-      return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
-          Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
-      }
-      return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
-          Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
-    }
-  }
-
-  /**
-   * Constructor for tests
-   *
-   * @param configuration      to HBase
-   * @param clusterId          for the cluster
-   * @param localAddress       local address to connect to
-   * @param metrics            the connection metrics
-   * @param channelInitializer for custom channel handlers
-   */
-  protected AsyncRpcClient(Configuration configuration, String clusterId,
-      SocketAddress localAddress, MetricsConnection metrics,
-      ChannelInitializer<SocketChannel> channelInitializer) {
-    super(configuration, clusterId, localAddress, metrics);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting async Hbase RPC client");
-    }
-
-    Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
-    this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
-    if (useGlobalEventLoopGroup) {
-      eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
-    } else {
-      eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
-          + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
-    }
-
-    this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
-    this.failedServers = new FailedServers(configuration);
-
-    int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-
-    // Configure the default bootstrap.
-    this.bootstrap = new Bootstrap();
-    bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
-        .channel(eventLoopGroupAndChannelClass.getSecond())
-        .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
-        .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
-    if (channelInitializer == null) {
-      channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
-    }
-    bootstrap.handler(channelInitializer);
-    if (localAddress != null) {
-      bootstrap.localAddress(localAddress);
-    }
-  }
-
-  /** Used in test only. */
-  AsyncRpcClient(Configuration configuration) {
-    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
-  }
-
-  /** Used in test only. */
-  AsyncRpcClient(Configuration configuration,
-      ChannelInitializer<SocketChannel> channelInitializer) {
-    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
-  }
-
-  /**
-   * Constructor
-   *
-   * @param configuration to HBase
-   * @param clusterId     for the cluster
-   * @param localAddress  local address to connect to
-   * @param metrics       the connection metrics
-   */
-  public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
-      MetricsConnection metrics) {
-    this(configuration, clusterId, localAddress, metrics, null);
-  }
-
-  /**
-   * Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> protocol,
-   * with the <code>ticket</code> credentials, returning the value.
-   * Throws exceptions if there are network problems or if the remote code
-   * threw an exception.
-   *
-   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
-   *               {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
-   *               instance of User each time so will be a new Connection each time.
-   * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException if call is interrupted
-   * @throws java.io.IOException  if a connection failure is encountered
-   */
-  @Override
-  protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
-      Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
-      InetSocketAddress addr, MetricsConnection.CallStats callStats)
-      throws IOException, InterruptedException {
-    if (pcrc == null) {
-      pcrc = new PayloadCarryingRpcController();
-    }
-    final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
-    Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
-    long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
-    try {
-      Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
-      return new Pair<>(response, pcrc.cellScanner());
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      } else {
-        throw wrapException(addr, (Exception) e.getCause());
-      }
-    } catch (TimeoutException e) {
-      CallTimeoutException cte = new CallTimeoutException(promise.toString());
-      throw wrapException(addr, cte);
-    }
-  }
-
-  /**
-   * Call method async
-   */
-  private void callMethod(final Descriptors.MethodDescriptor md,
-      final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
-      InetSocketAddress addr, final RpcCallback<Message> done) {
-    final AsyncRpcChannel connection;
-    try {
-      connection = createRpcChannel(md.getService().getName(), addr, ticket);
-      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-      GenericFutureListener<Future<Message>> listener =
-          new GenericFutureListener<Future<Message>>() {
-            @Override
-            public void operationComplete(Future<Message> future) throws Exception {
-              cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
-              if (metrics != null) {
-                metrics.updateRpc(md, param, cs);
-              }
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
-              }
-              if (!future.isSuccess()) {
-                Throwable cause = future.cause();
-                if (cause instanceof IOException) {
-                  pcrc.setFailed((IOException) cause);
-                } else {
-                  pcrc.setFailed(new IOException(cause));
-                }
-              } else {
-                try {
-                  done.run(future.get());
-                } catch (ExecutionException e) {
-                  Throwable cause = e.getCause();
-                  if (cause instanceof IOException) {
-                    pcrc.setFailed((IOException) cause);
-                  } else {
-                    pcrc.setFailed(new IOException(cause));
-                  }
-                } catch (InterruptedException e) {
-                  pcrc.setFailed(new IOException(e));
-                }
-              }
-            }
-          };
-      cs.setStartTime(EnvironmentEdgeManager.currentTime());
-      connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
-    } catch (StoppedRpcClientException|FailedServerException e) {
-      pcrc.setFailed(e);
-    }
-  }
-
-  private boolean closed = false;
-
-  /**
-   * Close netty
-   */
-  public void close() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stopping async HBase RPC client");
-    }
-
-    synchronized (connections) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      for (AsyncRpcChannel conn : connections.values()) {
-        conn.close(null);
-      }
-    }
-    // do not close global EventLoopGroup.
-    if (!useGlobalEventLoopGroup) {
-      bootstrap.group().shutdownGracefully();
-    }
-  }
-
-  /**
-   * Create a cell scanner
-   *
-   * @param cellBlock to create scanner for
-   * @return CellScanner
-   * @throws java.io.IOException on error on creation cell scanner
-   */
-  public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
-    return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
-  }
-
-  /**
-   * Build cell block
-   *
-   * @param cells to create block with
-   * @return ByteBuffer with cells
-   * @throws java.io.IOException if block creation fails
-   */
-  public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
-    return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
-  }
-
-  /**
-   * Creates an RPC client
-   *
-   * @param serviceName    name of servicce
-   * @param location       to connect to
-   * @param ticket         for current user
-   * @return new RpcChannel
-   * @throws StoppedRpcClientException when Rpc client is stopped
-   * @throws FailedServerException if server failed
-   */
-  private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
-      User ticket) throws StoppedRpcClientException, FailedServerException {
-    // Check if server is failed
-    if (this.failedServers.isFailedServer(location)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not trying to connect to " + location +
-            " this server is in the failed servers list");
-      }
-      throw new FailedServerException(
-          "This server is in the failed servers list: " + location);
-    }
-
-    int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
-
-    AsyncRpcChannel rpcChannel;
-    synchronized (connections) {
-      if (closed) {
-        throw new StoppedRpcClientException();
-      }
-      rpcChannel = connections.get(hashCode);
-      if (rpcChannel == null || !rpcChannel.isAlive()) {
-        rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
-        connections.put(hashCode, rpcChannel);
-      }
-    }
-
-    return rpcChannel;
-  }
-
-  /**
-   * Interrupt the connections to the given ip:port server. This should be called if the server
-   * is known as actually dead. This will not prevent current operation to be retried, and,
-   * depending on their own behavior, they may retry on the same server. This can be a feature,
-   * for example at startup. In any case, they're likely to get connection refused (if the
-   * process died) or no route to host: i.e. there next retries should be faster and with a
-   * safe exception.
-   *
-   * @param sn server to cancel connections for
-   */
-  @Override
-  public void cancelConnections(ServerName sn) {
-    synchronized (connections) {
-      for (AsyncRpcChannel rpcChannel : connections.values()) {
-        if (rpcChannel.isAlive() &&
-            rpcChannel.address.getPort() == sn.getPort() &&
-            rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
-          LOG.info("The server on " + sn.toString() +
-              " is dead - stopping the connection " + rpcChannel.toString());
-          rpcChannel.close(null);
-        }
-      }
-    }
-  }
-
-  /**
-   * Remove connection from pool
-   */
-  public void removeConnection(AsyncRpcChannel connection) {
-    int connectionHashCode = connection.hashCode();
-    synchronized (connections) {
-      // we use address as cache key, so we should check here to prevent removing the
-      // wrong connection
-      AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
-      if (connectionInPool != null && connectionInPool.equals(connection)) {
-        this.connections.remove(connectionHashCode);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
-          connection.toString(), System.identityHashCode(connection),
-          System.identityHashCode(connectionInPool)));
-      }
-    }
-  }
-
-  /**
-   * Creates a "channel" that can be used by a protobuf service.  Useful setting up
-   * protobuf stubs.
-   *
-   * @param sn server name describing location of server
-   * @param user which is to use the connection
-   * @param rpcTimeout default rpc operation timeout
-   *
-   * @return A rpc channel that goes via this rpc client instance.
-   */
-  public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
-    return new RpcChannelImplementation(this, sn, user, rpcTimeout);
-  }
-
-  /**
-   * Blocking rpc channel that goes via hbase rpc.
-   */
-  @VisibleForTesting
-  public static class RpcChannelImplementation implements RpcChannel {
-    private final InetSocketAddress isa;
-    private final AsyncRpcClient rpcClient;
-    private final User ticket;
-    private final int channelOperationTimeout;
-
-    /**
-     * @param channelOperationTimeout - the default timeout when no timeout is given
-     */
-    protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
-        final ServerName sn, final User ticket, int channelOperationTimeout) {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      this.rpcClient = rpcClient;
-      this.ticket = ticket;
-      this.channelOperationTimeout = channelOperationTimeout;
-    }
-
-    @Override
-    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
-        Message param, Message returnType, RpcCallback<Message> done) {
-      PayloadCarryingRpcController pcrc;
-      if (controller != null) {
-        pcrc = (PayloadCarryingRpcController) controller;
-        if (!pcrc.hasCallTimeout()) {
-          pcrc.setCallTimeout(channelOperationTimeout);
-        }
-      } else {
-        pcrc = new PayloadCarryingRpcController();
-        pcrc.setCallTimeout(channelOperationTimeout);
-      }
-
-      this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
-    }
-  }
-
-  Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
-    return WHEEL_TIMER.newTimeout(task, delay, unit);
-  }
-}