You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/09/08 12:33:22 UTC

[6/6] hbase git commit: HBASE-16445 Refactor and reimplement RpcClient

HBASE-16445 Refactor and reimplement RpcClient


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c04b3891
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c04b3891
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c04b3891

Branch: refs/heads/master
Commit: c04b389181b6a9299f05f1ad8f8b1ec62448331a
Parents: fc224ed
Author: zhangduo <zh...@apache.org>
Authored: Thu Sep 8 17:46:33 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Sep 8 20:32:56 2016 +0800

----------------------------------------------------------------------
 .../RpcRetryingCallerWithReadReplicas.java      |   21 +-
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  467 ++++--
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |  204 ---
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  768 ----------
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |  510 -------
 .../hbase/ipc/AsyncServerResponseHandler.java   |  126 --
 .../hadoop/hbase/ipc/BlockingRpcCallback.java   |    2 +-
 .../hadoop/hbase/ipc/BlockingRpcClient.java     |   77 +
 .../hadoop/hbase/ipc/BlockingRpcConnection.java |  725 ++++++++++
 .../hbase/ipc/BufferCallBeforeInitHandler.java  |  103 ++
 .../java/org/apache/hadoop/hbase/ipc/Call.java  |  122 +-
 .../hbase/ipc/CallCancelledException.java       |   37 +
 .../org/apache/hadoop/hbase/ipc/CallEvent.java  |   40 +
 .../hadoop/hbase/ipc/CallTimeoutException.java  |    6 +-
 .../hadoop/hbase/ipc/CellBlockBuilder.java      |  111 +-
 .../apache/hadoop/hbase/ipc/ConnectionId.java   |    2 +-
 .../hbase/ipc/DefaultNettyEventLoopConfig.java  |   37 +
 .../hbase/ipc/FallbackDisallowedException.java  |   38 +
 .../hadoop/hbase/ipc/IOExceptionConverter.java  |   34 -
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |   55 +-
 .../hadoop/hbase/ipc/MessageConverter.java      |   47 -
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java |   80 ++
 .../hbase/ipc/NettyRpcClientConfigHelper.java   |   83 ++
 .../hadoop/hbase/ipc/NettyRpcConnection.java    |  282 ++++
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java |  245 ++++
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  |    6 +-
 .../hadoop/hbase/ipc/RpcClientFactory.java      |   26 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 1359 ------------------
 .../apache/hadoop/hbase/ipc/RpcConnection.java  |  255 ++++
 .../security/AbstractHBaseSaslRpcClient.java    |  197 +++
 .../hbase/security/AsyncHBaseSaslRpcClient.java |   58 +
 .../AsyncHBaseSaslRpcClientHandler.java         |  135 ++
 .../hbase/security/HBaseSaslRpcClient.java      |  234 +--
 .../hbase/security/SaslChallengeDecoder.java    |  112 ++
 .../hbase/security/SaslClientHandler.java       |  382 -----
 .../hbase/security/SaslUnwrapHandler.java       |   53 +
 .../apache/hadoop/hbase/security/SaslUtil.java  |   14 +-
 .../hadoop/hbase/security/SaslWrapHandler.java  |   80 ++
 .../hadoop/hbase/ipc/TestCellBlockBuilder.java  |   19 +-
 .../ipc/TestRpcClientDeprecatedNameMapping.java |   56 +
 .../hbase/security/TestHBaseSaslRpcClient.java  |    8 +-
 .../hbase/ipc/IntegrationTestRpcClient.java     |   25 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    2 +-
 .../hadoop/hbase/master/ServerManager.java      |    1 +
 .../hadoop/hbase/util/MultiHConnection.java     |    1 -
 .../hadoop/hbase/client/TestClientTimeouts.java |   11 +-
 .../hbase/client/TestRpcControllerFactory.java  |    4 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |  121 +-
 .../apache/hadoop/hbase/ipc/TestAsyncIPC.java   |  113 --
 .../hadoop/hbase/ipc/TestBlockingIPC.java       |   58 +
 .../hbase/ipc/TestGlobalEventLoopGroup.java     |   53 -
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |   74 -
 .../apache/hadoop/hbase/ipc/TestNettyIPC.java   |  128 ++
 .../hbase/ipc/TestProtobufRpcServiceImpl.java   |    2 +-
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java    |   17 +-
 .../hbase/ipc/TestRpcHandlerException.java      |    3 +-
 .../regionserver/TestRegionReplicaFailover.java |    2 +
 .../hbase/security/AbstractTestSecureIPC.java   |  261 ----
 .../hbase/security/TestAsyncSecureIPC.java      |   33 -
 .../hadoop/hbase/security/TestSecureIPC.java    |  250 +++-
 .../TestDelegationTokenWithEncryption.java      |   12 +-
 .../token/TestGenerateDelegationToken.java      |    8 +-
 62 files changed, 3923 insertions(+), 4472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 0ea696e..3d55136 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -97,7 +100,6 @@ public class RpcRetryingCallerWithReadReplicas {
       this.id = id;
       this.location = location;
       this.controller = rpcControllerFactory.newController();
-      controller.setPriority(tableName);
     }
 
     @Override
@@ -134,6 +136,11 @@ public class RpcRetryingCallerWithReadReplicas {
       setStub(cConnection.getClient(dest));
     }
 
+    private void initRpcController() {
+      controller.reset();
+      controller.setCallTimeout(callTimeout);
+      controller.setPriority(tableName);
+    }
     @Override
     protected Result rpcCall() throws Exception {
       if (controller.isCanceled()) return null;
@@ -141,16 +148,13 @@ public class RpcRetryingCallerWithReadReplicas {
         throw new InterruptedIOException();
       }
       byte[] reg = location.getRegionInfo().getRegionName();
-      ClientProtos.GetRequest request =
-          RequestConverter.buildGetRequest(reg, get);
-      // Presumption that we are passed a PayloadCarryingRpcController here!
-      HBaseRpcController pcrc = (HBaseRpcController)controller;
-      pcrc.setCallTimeout(callTimeout);
+      ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
+      initRpcController();
       ClientProtos.GetResponse response = getStub().get(controller, request);
       if (response == null) {
         return null;
       }
-      return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner());
+      return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
     }
 
     @Override
@@ -183,7 +187,7 @@ public class RpcRetryingCallerWithReadReplicas {
 
     RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
         : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
-    ResultBoundedCompletionService<Result> cs =
+   final ResultBoundedCompletionService<Result> cs =
         new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
 
     if(isTargetReplicaSpecified) {
@@ -207,7 +211,6 @@ public class RpcRetryingCallerWithReadReplicas {
       // submit call for the all of the secondaries at once
       addCallsForReplica(cs, rl, 1, rl.size() - 1);
     }
-
     try {
       try {
         long start = EnvironmentEdgeManager.currentTime();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 6cb0786..098ad3c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -18,54 +18,106 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
+import io.netty.util.HashedWheelTimer;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
 
 /**
  * Provides the basics for a RpcClient implementation like configuration and Logging.
+ * <p>
+ * Locking schema of the current IPC implementation
+ * <ul>
+ * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
+ * connection.</li>
+ * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
+ * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
+ * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
+ * of how to deal with cancel.</li>
+ * <li>For connection implementation, the construction of a connection should be as fast as possible
+ * because the creation is protected under a lock. Connect to remote side when needed. There is no
+ * forced locking schema for a connection implementation.</li>
+ * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
+ * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
+ * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
+ * of the callbacks are free to hold any lock.</li>
+ * </ul>
  */
 @InterfaceAudience.Private
-public abstract class AbstractRpcClient implements RpcClient {
+public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
   // Log level is being changed in tests
   public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
 
+  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
+      Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
+
+  private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
+      .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
+
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+
+  static {
+    TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
+  }
+
+  protected boolean running = true; // if client runs
+
   protected final Configuration conf;
-  protected String clusterId;
+  protected final String clusterId;
   protected final SocketAddress localAddr;
   protected final MetricsConnection metrics;
 
-  protected UserProvider userProvider;
+  protected final UserProvider userProvider;
   protected final CellBlockBuilder cellBlockBuilder;
 
   protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
   // time (in ms), it will be closed at any moment.
-  protected final int maxRetries; //the max. no. of retries for socket connections
+  protected final int maxRetries; // the max. no. of retries for socket connections
   protected final long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -73,13 +125,20 @@ public abstract class AbstractRpcClient implements RpcClient {
   protected final CompressionCodec compressor;
   protected final boolean fallbackAllowed;
 
+  protected final FailedServers failedServers;
+
   protected final int connectTO;
   protected final int readTO;
   protected final int writeTO;
 
+  protected final PoolMap<ConnectionId, T> connections;
+
+  private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+  private final ScheduledFuture<?> cleanupIdleConnectionTask;
+
   /**
    * Construct an IPC client for the cluster <code>clusterId</code>
-   *
    * @param conf configuration
    * @param clusterId the cluster id
    * @param localAddr client socket bind address.
@@ -92,7 +151,7 @@ public abstract class AbstractRpcClient implements RpcClient {
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
     this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
     this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
     this.cellBlockBuilder = new CellBlockBuilder(conf);
@@ -102,31 +161,53 @@ public abstract class AbstractRpcClient implements RpcClient {
     this.codec = getCodec();
     this.compressor = getCompressor(conf);
     this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.failedServers = new FailedServers(conf);
     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
     this.metrics = metrics;
 
-    // login the server principal (if using secure Hadoop)
+    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
+
+    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
+
+      @Override
+      public void run() {
+        cleanupIdleConnections();
+      }
+    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
-          ", tcpKeepAlive=" + this.tcpKeepAlive +
-          ", tcpNoDelay=" + this.tcpNoDelay +
-          ", connectTO=" + this.connectTO +
-          ", readTO=" + this.readTO +
-          ", writeTO=" + this.writeTO +
-          ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
-          ", maxRetries=" + this.maxRetries +
-          ", fallbackAllowed=" + this.fallbackAllowed +
-          ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
+          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
+          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
+          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
+          + this.fallbackAllowed + ", bind address="
+          + (this.localAddr != null ? this.localAddr : "null"));
+    }
+  }
+
+  private void cleanupIdleConnections() {
+    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
+    synchronized (connections) {
+      for (T conn : connections.values()) {
+        // remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
+        // connection itself has already shutdown. The latter check is because that we may still
+        // have some pending calls on connection so we should not shutdown the connection outside.
+        // The connection itself will disconnect if there is no pending call for maxIdleTime.
+        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
+          LOG.info("Cleanup idle connection to " + conn.remoteId().address);
+          connections.removeValue(conn.remoteId(), conn);
+        }
+      }
     }
   }
 
   @VisibleForTesting
   public static String getDefaultCodec(final Configuration c) {
     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
-    // Configuration will complain -- then no default codec (and we'll pb everything).  Else
+    // Configuration will complain -- then no default codec (and we'll pb everything). Else
     // default is KeyValueCodec
     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
   }
@@ -143,7 +224,7 @@ public abstract class AbstractRpcClient implements RpcClient {
       return null;
     }
     try {
-      return (Codec)Class.forName(className).newInstance();
+      return (Codec) Class.forName(className).newInstance();
     } catch (Exception e) {
       throw new RuntimeException("Failed getting codec " + className, e);
     }
@@ -154,6 +235,12 @@ public abstract class AbstractRpcClient implements RpcClient {
     return this.codec != null;
   }
 
+  // for writing tests that want to throw exception when connecting.
+  @VisibleForTesting
+  boolean isTcpNoDelay() {
+    return tcpNoDelay;
+  }
+
   /**
    * Encapsulate the ugly casting and RuntimeException conversion in private method.
    * @param conf configuration
@@ -165,163 +252,297 @@ public abstract class AbstractRpcClient implements RpcClient {
       return null;
     }
     try {
-      return (CompressionCodec)Class.forName(className).newInstance();
+      return (CompressionCodec) Class.forName(className).newInstance();
     } catch (Exception e) {
       throw new RuntimeException("Failed getting compressor " + className, e);
     }
   }
 
   /**
-   * Return the pool type specified in the configuration, which must be set to
-   * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
-   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
-   * otherwise default to the former.
-   *
-   * For applications with many user threads, use a small round-robin pool. For
-   * applications with few user threads, you may want to try using a
-   * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
-   * instances should not exceed the operating system's hard limit on the number of
-   * connections.
-   *
+   * Return the pool type specified in the configuration, which must be set to either
+   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
+   * former. For applications with many user threads, use a small round-robin pool. For applications
+   * with few user threads, you may want to try using a thread-local pool. In any case, the number
+   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
+   * system's hard limit on the number of connections.
    * @param config configuration
    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
    */
-  protected static PoolMap.PoolType getPoolType(Configuration config) {
-    return PoolMap.PoolType
-        .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
-            PoolMap.PoolType.ThreadLocal);
+  private static PoolMap.PoolType getPoolType(Configuration config) {
+    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+      PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
   }
 
   /**
-   * Return the pool size specified in the configuration, which is applicable only if
-   * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
-   *
+   * Return the pool size specified in the configuration, which is applicable only if the pool type
+   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
    * @param config configuration
    * @return the maximum pool size
    */
-  protected static int getPoolSize(Configuration config) {
+  private static int getPoolSize(Configuration config) {
     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
   }
 
+  private int nextCallId() {
+    int id, next;
+    do {
+      id = callIdCnt.get();
+      next = id < Integer.MAX_VALUE ? id + 1 : 0;
+    } while (!callIdCnt.compareAndSet(id, next));
+    return id;
+  }
+
   /**
    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
    * threw an exception.
-   *
    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
-   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
-   *               will be a
-   *               new Connection each time.
+   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
+   *          new Connection each time.
    * @return A pair with the Message response and the Cell data (if any).
    */
-  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController pcrc,
+  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
       throws ServiceException {
-    if (pcrc == null) {
-      pcrc = new HBaseRpcControllerImpl();
+    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
+    callMethod(md, hrc, param, returnType, ticket, isa, done);
+    Message val;
+    try {
+      val = done.get();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    if (hrc.failed()) {
+      throw new ServiceException(hrc.getFailed());
+    } else {
+      return val;
     }
+  }
 
-    Pair<Message, CellScanner> val;
-    try {
-      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-      cs.setStartTime(EnvironmentEdgeManager.currentTime());
-      val = call(pcrc, md, param, returnType, ticket, isa, cs);
-      // Shove the results into controller so can be carried across the proxy/pb service void.
-      pcrc.setCellScanner(val.getSecond());
-
-      cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
-      if (metrics != null) {
-        metrics.updateRpc(md, param, cs);
+  /**
+   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
+   * given host/port are reused.
+   */
+  private T getConnection(ConnectionId remoteId) throws IOException {
+    if (failedServers.isFailedServer(remoteId.getAddress())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not trying to connect to " + remoteId.address
+            + " this server is in the failed servers list");
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+      throw new FailedServerException(
+          "This server is in the failed servers list: " + remoteId.address);
+    }
+    T conn;
+    synchronized (connections) {
+      if (!running) {
+        throw new StoppedRpcClientException();
       }
-      return val.getFirst();
-    } catch (Throwable e) {
-      throw new ServiceException(e);
+      conn = connections.get(remoteId);
+      if (conn == null) {
+        conn = createConnection(remoteId);
+        connections.put(remoteId, conn);
+      }
+      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
     }
+    return conn;
   }
 
   /**
-   * Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> protocol,
-   * with the <code>ticket</code> credentials, returning the value.
-   * Throws exceptions if there are network problems or if the remote code
-   * threw an exception.
-   *
-   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
-   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
-   *               will be a
-   *               new Connection each time.
-   * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException if call is interrupted
-   * @throws java.io.IOException if transport failed
+   * Not connected.
    */
-  protected abstract Pair<Message, CellScanner> call(HBaseRpcController pcrc,
-      Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
-      InetSocketAddress isa, MetricsConnection.CallStats callStats)
-      throws IOException, InterruptedException;
+  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
 
-  @Override
-  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
-      int defaultOperationTimeout) throws UnknownHostException {
-    return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
+  private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
+      RpcCallback<Message> callback) {
+    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
+    if (metrics != null) {
+      metrics.updateRpc(call.md, call.param, call.callStats);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
+    }
+    if (call.error != null) {
+      if (call.error instanceof RemoteException) {
+        call.error.fillInStackTrace();
+        hrc.setFailed(call.error);
+      } else {
+        hrc.setFailed(wrapException(addr, call.error));
+      }
+      callback.run(null);
+    } else {
+      hrc.setDone(call.cells);
+      callback.run(call.response);
+    }
+  }
+
+  private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+      final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
+      final RpcCallback<Message> callback) {
+    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+    cs.setStartTime(EnvironmentEdgeManager.currentTime());
+    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
+        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
+
+          @Override
+          public void run(Call call) {
+            onCallFinished(call, hrc, addr, callback);
+          }
+        }, cs);
+    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+    try {
+      T connection = getConnection(remoteId);
+      connection.sendRequest(call, hrc);
+    } catch (Exception e) {
+      call.setException(toIOE(e));
+    }
+  }
+
+  private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+    InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
+    if (addr.isUnresolved()) {
+      throw new UnknownHostException("can not resolve " + sn.getServerName());
+    }
+    return addr;
   }
 
   /**
-   * Configure a payload carrying controller
-   * @param controller to configure
-   * @param channelOperationTimeout timeout for operation
-   * @return configured payload controller
+   * Interrupt the connections to the given ip:port server. This should be called if the server is
+   * known as actually dead. This will not prevent current operation to be retried, and, depending
+   * on their own behavior, they may retry on the same server. This can be a feature, for example at
+   * startup. In any case, they're likely to get connection refused (if the process died) or no
+   * route to host: i.e. their next retries should be faster and with a safe exception.
    */
-  static HBaseRpcController configurePayloadCarryingRpcController(
-      RpcController controller, int channelOperationTimeout) {
-    HBaseRpcController pcrc;
-    if (controller != null && controller instanceof HBaseRpcController) {
-      pcrc = (HBaseRpcController) controller;
-      if (!pcrc.hasCallTimeout()) {
-        pcrc.setCallTimeout(channelOperationTimeout);
+  @Override
+  public void cancelConnections(ServerName sn) {
+    synchronized (connections) {
+      for (T connection : connections.values()) {
+        ConnectionId remoteId = connection.remoteId();
+        if (remoteId.address.getPort() == sn.getPort()
+            && remoteId.address.getHostName().equals(sn.getHostname())) {
+          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
+              + connection.remoteId);
+          connection.shutdown();
+        }
       }
-    } else {
-      pcrc = new HBaseRpcControllerImpl();
-      pcrc.setCallTimeout(channelOperationTimeout);
     }
-    return pcrc;
+  }
+
+  protected abstract void closeInternal();
+
+  @Override
+  public void close() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping rpc client");
+    }
+    Collection<T> connToClose;
+    synchronized (connections) {
+      if (!running) {
+        return;
+      }
+      running = false;
+      connToClose = connections.values();
+      connections.clear();
+    }
+    cleanupIdleConnectionTask.cancel(true);
+    for (T conn : connToClose) {
+      conn.shutdown();
+    }
+    closeInternal();
+  }
+
+  @Override
+  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
+      int rpcTimeout) throws UnknownHostException {
+    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
+  }
+
+  @Override
+  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
+      throws UnknownHostException {
+    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
+  }
+
+  private static class AbstractRpcChannel {
+
+    protected final InetSocketAddress addr;
+
+    protected final AbstractRpcClient<?> rpcClient;
+
+    protected final User ticket;
+
+    protected final int rpcTimeout;
+
+    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+        User ticket, int rpcTimeout) {
+      this.addr = addr;
+      this.rpcClient = rpcClient;
+      this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
+    }
+
+    /**
+     * Configure an rpc controller
+     * @param controller to configure
+     * @return configured rpc controller
+     */
+    protected HBaseRpcController configureRpcController(RpcController controller) {
+      HBaseRpcController hrc;
+      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
+      // side. And now we may use ServerRpcController.
+      if (controller != null && controller instanceof HBaseRpcController) {
+        hrc = (HBaseRpcController) controller;
+        if (!hrc.hasCallTimeout()) {
+          hrc.setCallTimeout(rpcTimeout);
+        }
+      } else {
+        hrc = new HBaseRpcControllerImpl();
+        hrc.setCallTimeout(rpcTimeout);
+      }
+      return hrc;
+    }
   }
 
   /**
    * Blocking rpc channel that goes via hbase rpc.
    */
   @VisibleForTesting
-  public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
-    private final InetSocketAddress isa;
-    private final AbstractRpcClient rpcClient;
-    private final User ticket;
-    private final int channelOperationTimeout;
+  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
+      implements BlockingRpcChannel {
 
-    /**
-     * @param channelOperationTimeout - the default timeout when no timeout is given
-     */
-    protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
-        final ServerName sn, final User ticket, int channelOperationTimeout)
-        throws UnknownHostException {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      if (this.isa.isUnresolved()) {
-        throw new UnknownHostException(sn.getHostname());
-      }
-      this.rpcClient = rpcClient;
-      this.ticket = ticket;
-      this.channelOperationTimeout = channelOperationTimeout;
+    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
+        InetSocketAddress addr, User ticket, int rpcTimeout) {
+      super(rpcClient, addr, ticket, rpcTimeout);
     }
 
     @Override
     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
         Message param, Message returnType) throws ServiceException {
-      HBaseRpcController pcrc = configurePayloadCarryingRpcController(
-          controller,
-          channelOperationTimeout);
+      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
+        param, returnType, ticket, addr);
+    }
+  }
 
-      return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
+  /**
+   * Async rpc channel that goes via hbase rpc.
+   */
+  public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel {
+
+    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+        User ticket, int rpcTimeout) throws UnknownHostException {
+      super(rpcClient, addr, ticket, rpcTimeout);
+    }
+
+    @Override
+    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
+        Message returnType, RpcCallback<Message> done) {
+      // This method does not throw any exceptions, so the caller must provide a
+      // HBaseRpcController which is used to pass the exceptions.
+      this.rpcClient.callMethod(md,
+        configureRpcController(Preconditions.checkNotNull(controller,
+          "RpcController can not be null for async rpc call")),
+        param, returnType, ticket, addr, done);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
deleted file mode 100644
index 33536df..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-
-import io.netty.util.concurrent.DefaultPromise;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
-
-/**
- * Represents an Async Hbase call and its response.
- *
- * Responses are passed on to its given doneHandler and failures to the rpcController
- *
- * @param <T> Type of message returned
- * @param <M> Message returned in communication to be converted
- */
-@InterfaceAudience.Private
-public class AsyncCall<M extends Message, T> extends DefaultPromise<T> {
-  private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
-
-  final int id;
-
-  private final AsyncRpcChannel channel;
-
-  final Descriptors.MethodDescriptor method;
-  final Message param;
-  final Message responseDefaultType;
-
-  private final MessageConverter<M,T> messageConverter;
-  private final IOExceptionConverter exceptionConverter;
-
-  final long rpcTimeout;
-
-  // For only the request
-  private final CellScanner cellScanner;
-  private final int priority;
-
-  final MetricsConnection clientMetrics;
-  final MetricsConnection.CallStats callStats;
-
-  /**
-   * Constructor
-   *
-   * @param channel             which initiated call
-   * @param connectId           connection id
-   * @param md                  the method descriptor
-   * @param param               parameters to send to Server
-   * @param cellScanner         cellScanner containing cells to send as request
-   * @param responseDefaultType the default response type
-   * @param messageConverter    converts the messages to what is the expected output
-   * @param exceptionConverter  converts exceptions to expected format. Can be null
-   * @param rpcTimeout          timeout for this call in ms
-   * @param priority            for this request
-   * @param metrics             MetricsConnection to which the metrics are stored for this request
-   */
-  public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor
-        md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
-        messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
-      MetricsConnection metrics) {
-    super(channel.getEventExecutor());
-    this.channel = channel;
-
-    this.id = connectId;
-
-    this.method = md;
-    this.param = param;
-    this.responseDefaultType = responseDefaultType;
-
-    this.messageConverter = messageConverter;
-    this.exceptionConverter = exceptionConverter;
-
-    this.rpcTimeout = rpcTimeout;
-
-    this.priority = priority;
-    this.cellScanner = cellScanner;
-
-    this.callStats = MetricsConnection.newCallStats();
-    callStats.setStartTime(EnvironmentEdgeManager.currentTime());
-
-    this.clientMetrics = metrics;
-  }
-
-  /**
-   * Get the start time
-   *
-   * @return start time for the call
-   */
-  public long getStartTime() {
-    return this.callStats.getStartTime();
-  }
-
-  @Override
-  public String toString() {
-    return "callId=" + this.id + ", method=" + this.method.getName() +
-      ", rpcTimeout=" + this.rpcTimeout + ", param {" +
-      (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
-  }
-
-  /**
-   * Set success with a cellBlockScanner
-   *
-   * @param value            to set
-   * @param cellBlockScanner to set
-   */
-  public void setSuccess(M value, CellScanner cellBlockScanner) {
-    callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - callStats.getStartTime());
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: " + method.getName() + ", callTime: " + callStats.getCallTimeMs() + "ms");
-    }
-
-    if (clientMetrics != null) {
-      clientMetrics.updateRpc(method, param, callStats);
-    }
-
-    try {
-      this.setSuccess(
-          this.messageConverter.convert(value, cellBlockScanner)
-      );
-    } catch (IOException e) {
-      this.setFailed(e);
-    }
-  }
-
-  /**
-   * Set failed
-   *
-   * @param exception to set
-   */
-  public void setFailed(IOException exception) {
-    if (ExceptionUtil.isInterrupt(exception)) {
-      exception = ExceptionUtil.asInterrupt(exception);
-    }
-    if (exception instanceof RemoteException) {
-      exception = ((RemoteException) exception).unwrapRemoteException();
-    }
-
-    if (this.exceptionConverter != null) {
-      exception = this.exceptionConverter.convert(exception);
-    }
-
-    this.setFailure(exception);
-  }
-
-  /**
-   * Get the rpc timeout
-   *
-   * @return current timeout for this call
-   */
-  public long getRpcTimeout() {
-    return rpcTimeout;
-  }
-
-
-  /**
-   * @return Priority for this call
-   */
-  public int getPriority() {
-    return priority;
-  }
-
-  /**
-   * Get the cellScanner for this request.
-   * @return CellScanner
-   */
-  public CellScanner cellScanner() {
-    return cellScanner;
-  }
-
-  @Override
-  public boolean cancel(boolean mayInterupt){
-    this.channel.removePendingCall(this.id);
-    return super.cancel(mayInterupt);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
deleted file mode 100644
index 2ec5adc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannel {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
-
-  private static final int MAX_SASL_RETRIES = 5;
-
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
-    = new HashMap<>();
-
-  static {
-    TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
-      new AuthenticationTokenSelector());
-  }
-
-  final AsyncRpcClient client;
-
-  // Contains the channel to work with.
-  // Only exists when connected
-  private Channel channel;
-
-  String name;
-  final User ticket;
-  final String serviceName;
-  final InetSocketAddress address;
-
-  private int failureCounter = 0;
-
-  boolean useSasl;
-  AuthMethod authMethod;
-  private int reloginMaxBackoff;
-  private Token<? extends TokenIdentifier> token;
-  private String serverPrincipal;
-
-  // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
-  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
-  private boolean connected = false;
-  private boolean closed = false;
-
-  private Timeout cleanupTimer;
-
-  private final TimerTask timeoutTask = new TimerTask() {
-    @Override
-    public void run(Timeout timeout) throws Exception {
-      cleanupCalls();
-    }
-  };
-
-  /**
-   * Constructor for netty RPC channel
-   * @param bootstrap to construct channel on
-   * @param client to connect with
-   * @param ticket of user which uses connection
-   * @param serviceName name of service to connect to
-   * @param address to connect to
-   */
-  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
-      String serviceName, InetSocketAddress address) {
-    this.client = client;
-
-    this.ticket = ticket;
-    this.serviceName = serviceName;
-    this.address = address;
-
-    this.channel = connect(bootstrap).channel();
-
-    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
-        + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
-  }
-
-  /**
-   * Connect to channel
-   * @param bootstrap to connect to
-   * @return future of connection
-   */
-  private ChannelFuture connect(final Bootstrap bootstrap) {
-    return bootstrap.remoteAddress(address).connect()
-        .addListener(new GenericFutureListener<ChannelFuture>() {
-          @Override
-          public void operationComplete(final ChannelFuture f) throws Exception {
-            if (!f.isSuccess()) {
-              retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
-              return;
-            }
-            channel = f.channel();
-
-            setupAuthorization();
-
-            ByteBuf b = channel.alloc().directBuffer(6);
-            createPreamble(b, authMethod);
-            channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            if (useSasl) {
-              UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
-              if (authMethod == AuthMethod.KERBEROS) {
-                if (ticket != null && ticket.getRealUser() != null) {
-                  ticket = ticket.getRealUser();
-                }
-              }
-              SaslClientHandler saslHandler;
-              if (ticket == null) {
-                throw new FatalConnectionException("ticket/user is null");
-              }
-              final UserGroupInformation realTicket = ticket;
-              saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
-                @Override
-                public SaslClientHandler run() throws IOException {
-                  return getSaslHandler(realTicket, bootstrap);
-                }
-              });
-              if (saslHandler != null) {
-                // Sasl connect is successful. Let's set up Sasl channel handler
-                channel.pipeline().addFirst(saslHandler);
-              } else {
-                // fall back to simple auth because server told us so.
-                authMethod = AuthMethod.SIMPLE;
-                useSasl = false;
-              }
-            } else {
-              startHBaseConnection(f.channel());
-            }
-          }
-        });
-  }
-
-  /**
-   * Start HBase connection
-   * @param ch channel to start connection on
-   */
-  private void startHBaseConnection(Channel ch) {
-    ch.pipeline().addLast("frameDecoder",
-      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    try {
-      writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            close(future.cause());
-            return;
-          }
-          List<AsyncCall> callsToWrite;
-          synchronized (pendingCalls) {
-            connected = true;
-            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-          }
-          for (AsyncCall call : callsToWrite) {
-            writeRequest(call);
-          }
-        }
-      });
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  private void startConnectionWithEncryption(Channel ch) {
-    // for rpc encryption, the order of ChannelInboundHandler should be:
-    // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
-    // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
-    // SaslClientHandler will handler this
-    ch.pipeline().addFirst("beforeUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
-    ch.pipeline().addLast("afterUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    List<AsyncCall> callsToWrite;
-    synchronized (pendingCalls) {
-      connected = true;
-      callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-    }
-    for (AsyncCall call : callsToWrite) {
-      writeRequest(call);
-    }
-  }
-
-  /**
-   * Get SASL handler
-   * @param bootstrap to reconnect to
-   * @return new SASL handler
-   * @throws java.io.IOException if handler failed to create
-   */
-  private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
-      final Bootstrap bootstrap) throws IOException {
-    return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
-        client.fallbackAllowed,
-        client.conf.get("hbase.rpc.protection",
-          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
-        getChannelHeaderBytes(authMethod),
-        new SaslClientHandler.SaslExceptionHandler() {
-          @Override
-          public void handle(int retryCount, Random random, Throwable cause) {
-            try {
-              // Handle Sasl failure. Try to potentially get new credentials
-              handleSaslConnectionFailure(retryCount, cause, realTicket);
-
-              retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
-                cause);
-            } catch (IOException | InterruptedException e) {
-              close(e);
-            }
-          }
-        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
-          @Override
-          public void onSuccess(Channel channel) {
-            startHBaseConnection(channel);
-          }
-
-          @Override
-          public void onSaslProtectionSucess(Channel channel) {
-            startConnectionWithEncryption(channel);
-          }
-        });
-  }
-
-  /**
-   * Retry to connect or close
-   * @param bootstrap to connect with
-   * @param failureCount failure count
-   * @param e exception of fail
-   */
-  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
-      Throwable e) {
-    if (failureCount < client.maxRetries) {
-      client.newTimeout(new TimerTask() {
-        @Override
-        public void run(Timeout timeout) throws Exception {
-          connect(bootstrap);
-        }
-      }, timeout, TimeUnit.MILLISECONDS);
-    } else {
-      client.failedServers.addToFailedServers(address);
-      close(e);
-    }
-  }
-
-  /**
-   * Calls method on channel
-   * @param method to call
-   * @param request to send
-   * @param cellScanner with cells to send
-   * @param responsePrototype to construct response with
-   * @param rpcTimeout timeout for request
-   * @param priority for request
-   * @return Promise for the response Message
-   */
-  public <R extends Message, O> io.netty.util.concurrent.Promise<O> callMethod(
-      final Descriptors.MethodDescriptor method,
-      final Message request,final CellScanner cellScanner,
-      R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
-      exceptionConverter, long rpcTimeout, int priority) {
-    final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
-        method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
-        rpcTimeout, priority, client.metrics);
-
-    synchronized (pendingCalls) {
-      if (closed) {
-        call.setFailure(new ConnectException());
-        return call;
-      }
-      pendingCalls.put(call.id, call);
-      // Add timeout for cleanup if none is present
-      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
-      }
-      if (!connected) {
-        return call;
-      }
-    }
-    writeRequest(call);
-    return call;
-  }
-
-  public EventLoop getEventExecutor() {
-    return this.channel.eventLoop();
-  }
-
-  AsyncCall removePendingCall(int id) {
-    synchronized (pendingCalls) {
-      return pendingCalls.remove(id);
-    }
-  }
-
-  /**
-   * Write the channel header
-   * @param channel to write to
-   * @return future of write
-   * @throws java.io.IOException on failure to write
-   */
-  private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-    ByteBuf b = channel.alloc().directBuffer(totalSize);
-
-    b.writeInt(header.getSerializedSize());
-    b.writeBytes(header.toByteArray());
-
-    return channel.writeAndFlush(b);
-  }
-
-  private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
-    b.putInt(header.getSerializedSize());
-    b.put(header.toByteArray());
-    return b.array();
-  }
-
-  private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
-        .setServiceName(serviceName);
-
-    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
-    if (userInfoPB != null) {
-      headerBuilder.setUserInfo(userInfoPB);
-    }
-
-    if (client.codec != null) {
-      headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
-    }
-    if (client.compressor != null) {
-      headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
-    }
-
-    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
-    return headerBuilder.build();
-  }
-
-  /**
-   * Write request to channel
-   * @param call to write
-   */
-  private void writeRequest(final AsyncCall call) {
-    try {
-      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
-          .newBuilder();
-      requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
-          .setRequestParam(call.param != null);
-
-      if (Trace.isTracing()) {
-        Span s = Trace.currentSpan();
-        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
-            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
-      }
-
-      ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
-      if (cellBlock != null) {
-        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
-            .newBuilder();
-        cellBlockBuilder.setLength(cellBlock.limit());
-        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
-      }
-      // Only pass priority if there one. Let zero be same as no priority.
-      if (call.getPriority() != HBaseRpcController.PRIORITY_UNSET) {
-        requestHeaderBuilder.setPriority(call.getPriority());
-      }
-      requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
-          Integer.MAX_VALUE : (int)call.rpcTimeout);
-
-      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
-      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
-      if (cellBlock != null) {
-        totalSize += cellBlock.remaining();
-      }
-
-      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
-      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
-        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
-      }
-
-      channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Set up server authorization
-   * @throws java.io.IOException if auth setup failed
-   */
-  private void setupAuthorization() throws IOException {
-    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
-    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
-    this.token = null;
-    if (useSasl && securityInfo != null) {
-      AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
-      if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
-        if (tokenSelector != null) {
-          token = tokenSelector.selectToken(new Text(client.clusterId),
-            ticket.getUGI().getTokens());
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("No token selector found for type " + tokenKind);
-        }
-      }
-      String serverKey = securityInfo.getServerPrincipal();
-      if (serverKey == null) {
-        throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
-      }
-      this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
-        address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
-            + serverPrincipal);
-      }
-    }
-
-    if (!useSasl) {
-      authMethod = AuthMethod.SIMPLE;
-    } else if (token != null) {
-      authMethod = AuthMethod.DIGEST;
-    } else {
-      authMethod = AuthMethod.KERBEROS;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
-    }
-    reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
-  }
-
-  /**
-   * Build the user information
-   * @param ugi User Group Information
-   * @param authMethod Authorization method
-   * @return UserInformation protobuf
-   */
-  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
-    if (ugi == null || authMethod == AuthMethod.DIGEST) {
-      // Don't send user for token auth
-      return null;
-    }
-    RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
-    if (authMethod == AuthMethod.KERBEROS) {
-      // Send effective user for Kerberos auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-    } else if (authMethod == AuthMethod.SIMPLE) {
-      // Send both effective user and real user for simple auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-      if (ugi.getRealUser() != null) {
-        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-      }
-    }
-    return userInfoPB.build();
-  }
-
-  /**
-   * Create connection preamble
-   * @param byteBuf to write to
-   * @param authMethod to write
-   */
-  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
-    byteBuf.writeBytes(HConstants.RPC_HEADER);
-    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
-    byteBuf.writeByte(authMethod.code);
-  }
-
-  private void close0(Throwable e) {
-    List<AsyncCall> toCleanup;
-    synchronized (pendingCalls) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
-      pendingCalls.clear();
-    }
-    IOException closeException = null;
-    if (e != null) {
-      if (e instanceof IOException) {
-        closeException = (IOException) e;
-      } else {
-        closeException = new IOException(e);
-      }
-    }
-    // log the info
-    if (LOG.isDebugEnabled() && closeException != null) {
-      LOG.debug(name + ": closing ipc connection to " + address, closeException);
-    }
-    if (cleanupTimer != null) {
-      cleanupTimer.cancel();
-      cleanupTimer = null;
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(closeException != null ? closeException
-          : new ConnectionClosingException(
-              "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
-    }
-    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(name + ": closed");
-    }
-  }
-
-  /**
-   * Close connection
-   * @param e exception on close
-   */
-  public void close(final Throwable e) {
-    client.removeConnection(this);
-
-    // Move closing from the requesting thread to the channel thread
-    if (channel.eventLoop().inEventLoop()) {
-      close0(e);
-    } else {
-      channel.eventLoop().execute(new Runnable() {
-        @Override
-        public void run() {
-          close0(e);
-        }
-      });
-    }
-  }
-
-  /**
-   * Clean up calls.
-   */
-  private void cleanupCalls() {
-    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long nextCleanupTaskDelay = -1L;
-    synchronized (pendingCalls) {
-      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
-        AsyncCall call = iter.next();
-        long timeout = call.getRpcTimeout();
-        if (timeout > 0) {
-          if (currentTime - call.getStartTime() >= timeout) {
-            iter.remove();
-            toCleanup.add(call);
-          } else {
-            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
-              nextCleanupTaskDelay = timeout;
-            }
-          }
-        }
-      }
-      if (nextCleanupTaskDelay > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
-      } else {
-        cleanupTimer = null;
-      }
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
-          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
-    }
-  }
-
-  /**
-   * Check if the connection is alive
-   * @return true if alive
-   */
-  public boolean isAlive() {
-    return channel.isOpen();
-  }
-
-  public InetSocketAddress getAddress() {
-    return this.address;
-  }
-
-  /**
-   * Check if user should authenticate over Kerberos
-   * @return true if should be authenticated over Kerberos
-   * @throws java.io.IOException on failure of check
-   */
-  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
-      // Make sure user logged in using Kerberos either keytab or TGT
-      loginUser.hasKerberosCredentials() &&
-      // relogin only in case it is the login user (e.g. JT)
-      // or superuser (like oozie).
-      (loginUser.equals(currentUser) || loginUser.equals(realUser));
-  }
-
-  /**
-   * If multiple clients with the same principal try to connect to the same server at the same time,
-   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
-   * work around this, what is done is that the client backs off randomly and tries to initiate the
-   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
-   * attempted.
-   * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
-   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
-   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
-   * underlying authentication implementation, so there is no retry from other high level (for eg,
-   * HCM or HBaseAdmin).
-   * </p>
-   * @param currRetries retry count
-   * @param ex exception describing fail
-   * @param user which is trying to connect
-   * @throws java.io.IOException if IO fail
-   * @throws InterruptedException if thread is interrupted
-   */
-  private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
-      final UserGroupInformation user) throws IOException, InterruptedException {
-    user.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws IOException, InterruptedException {
-        if (shouldAuthenticateOverKrb()) {
-          if (currRetries < MAX_SASL_RETRIES) {
-            LOG.debug("Exception encountered while connecting to the server : " + ex);
-            // try re-login
-            if (UserGroupInformation.isLoginKeytabBased()) {
-              UserGroupInformation.getLoginUser().reloginFromKeytab();
-            } else {
-              UserGroupInformation.getLoginUser().reloginFromTicketCache();
-            }
-
-            // Should reconnect
-            return null;
-          } else {
-            String msg = "Couldn't setup connection for "
-                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
-            LOG.warn(msg, ex);
-            throw (IOException) new IOException(msg).initCause(ex);
-          }
-        } else {
-          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
-        }
-        if (ex instanceof RemoteException) {
-          throw (RemoteException) ex;
-        }
-        if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed."
-              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
-          LOG.fatal(msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-        throw new IOException(ex);
-      }
-    });
-  }
-
-  public int getConnectionHashCode() {
-    return ConnectionId.hashCode(ticket, serviceName, address);
-  }
-
-  @Override
-  public int hashCode() {
-    return getConnectionHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof AsyncRpcChannel) {
-      AsyncRpcChannel channel = (AsyncRpcChannel) obj;
-      return channel.hashCode() == obj.hashCode();
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
-  }
-
-  /**
-   * Listens to call writes and fails if write failed
-   */
-  private static final class CallWriteListener implements ChannelFutureListener {
-    private final AsyncRpcChannel rpcChannel;
-    private final int id;
-
-    public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) {
-      this.rpcChannel = asyncRpcChannelImpl;
-      this.id = id;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (!future.isSuccess()) {
-        AsyncCall call = rpcChannel.removePendingCall(id);
-        if (call != null) {
-          if (future.cause() instanceof IOException) {
-            call.setFailed((IOException) future.cause());
-          } else {
-            call.setFailed(new IOException(future.cause()));
-          }
-        }
-      }
-    }
-  }
-}