You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/05 20:59:34 UTC

[2/2] kudu git commit: [java] separating Connection

[java] separating Connection

This patch separates lower-level, connection-related functionality
from the TabletClient class into the new Connection class.
The updated TabletClient has been renamed into RpcProxy.
Also, this patch contains other micro-updates on the related code.

In addition, this patch addresses KUDU-1878.

This work is done in the context of KUDU-2013.

Change-Id: Id4ac81d9454631e7501c31576c24f85e968bb871
Reviewed-on: http://gerrit.cloudera.org:8080/7146
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58248841
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58248841
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58248841

Branch: refs/heads/master
Commit: 58248841f213a64683ee217f025f0a38a8450f74
Parents: 7e74527
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Jun 1 18:39:13 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 5 20:58:46 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 277 +++----
 .../apache/kudu/client/ConnectToCluster.java    | 112 ++-
 .../java/org/apache/kudu/client/Connection.java | 634 +++++++++++++++
 .../org/apache/kudu/client/ConnectionCache.java | 269 ++-----
 .../java/org/apache/kudu/client/KuduRpc.java    |   6 +-
 .../java/org/apache/kudu/client/Negotiator.java |   5 +-
 .../org/apache/kudu/client/RemoteTablet.java    |  53 +-
 .../java/org/apache/kudu/client/RpcProxy.java   | 406 ++++++++++
 .../java/org/apache/kudu/client/ServerInfo.java |   4 +-
 .../org/apache/kudu/client/TabletClient.java    | 779 -------------------
 .../org/apache/kudu/client/BaseKuduTest.java    |   6 +-
 .../java/org/apache/kudu/client/ITClient.java   |   7 +-
 .../kudu/client/ITFaultTolerantScanner.java     |  11 +-
 .../kudu/client/ITNonFaultTolerantScanner.java  |   4 +-
 .../kudu/client/ITScannerMultiTablet.java       |  21 +-
 .../org/apache/kudu/client/MiniKuduCluster.java |  73 +-
 .../apache/kudu/client/TestAsyncKuduClient.java |  16 +-
 .../kudu/client/TestAsyncKuduSession.java       |  13 +-
 .../apache/kudu/client/TestConnectionCache.java | 109 ++-
 .../apache/kudu/client/TestRemoteTablet.java    |  31 +-
 .../org/apache/kudu/client/TestTimeouts.java    |   1 +
 21 files changed, 1504 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 019a3f5..a304d05 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,8 +27,10 @@
 package org.apache.kudu.client;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
@@ -43,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.security.auth.Subject;
 
@@ -67,6 +71,7 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
@@ -126,7 +131,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with
    * stale information.
    */
-  static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+  private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+
   /**
    * The number of tablets to fetch from the master when looking up a range of
    * tablets.
@@ -142,16 +148,18 @@ public class AsyncKuduClient implements AutoCloseable {
   private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
       new ConcurrentHashMap<>();
 
+  /** A cache to keep track of already opened connections to Kudu servers. */
   private final ConnectionCache connectionCache;
 
   @GuardedBy("sessions")
   private final Set<AsyncKuduSession> sessions = new HashSet<>();
 
-  // Since the masters also go through TabletClient, we need to treat them as if they were a normal
-  // table. We'll use the following fake table name to identify places where we need special
+  // Since RPCs to the masters also go through RpcProxy, we need to treat them as if they were a
+  // normal table. We'll use the following fake table name to identify places where we need special
   // handling.
+  // TODO(aserbin) clean this up
   static final String MASTER_TABLE_NAME_PLACEHOLDER =  "Kudu Master";
-  final KuduTable masterTable;
+  private final KuduTable masterTable;
   private final List<HostAndPort> masterAddresses;
 
   private final HashedWheelTimer timer;
@@ -212,7 +220,40 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     String clientId = UUID.randomUUID().toString().replace("-", "");
     this.requestTracker = new RequestTracker(clientId);
-    this.connectionCache = new ConnectionCache(this);
+    this.connectionCache = new ConnectionCache(
+        securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+  }
+
+  /**
+   * Get a proxy to send RPC calls to the specified server.
+   *
+   * @param serverInfo server's information
+   * @return the proxy object bound to the target server
+   */
+  @Nonnull
+  RpcProxy newRpcProxy(final ServerInfo serverInfo) {
+    Preconditions.checkNotNull(serverInfo);
+    return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+  }
+
+  /**
+   * Get a proxy to send RPC calls to Kudu master at the specified end-point.
+   *
+   * @param hostPort master end-point
+   * @return the proxy object bound to the target master
+   */
+  @Nullable
+  RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+    // We should have a UUID to construct ServerInfo for the master, but we have a chicken
+    // and egg problem, we first need to communicate with the masters to find out about them,
+    // and that's what we're trying to do. The UUID is just used for logging and cache key,
+    // so instead we just use concatenation of master host and port, prefixed with "master-".
+    final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+    if (inetAddress == null) {
+      // TODO(todd): should we log the resolution failure? throw an exception?
+      return null;
+    }
+    return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
   }
 
   /**
@@ -374,7 +415,7 @@ public class AsyncKuduClient implements AutoCloseable {
     return sendRpcToTablet(rpc);
   }
 
-  Deferred<GetTableSchemaResponse> getTableSchema(String name) {
+  private Deferred<GetTableSchemaResponse> getTableSchema(String name) {
     GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
     rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
@@ -501,9 +542,9 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * This callback will be repeatadly used when opening a table until it is done being created.
+   * This callback will be repeatedly used when opening a table until it is done being created.
    */
-  Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
+  private Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
       final KuduRpc<KuduTable> rpc, final KuduTable table) {
     return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>() {
       @Override
@@ -635,18 +676,6 @@ public class AsyncKuduClient implements AutoCloseable {
     return requestTracker;
   }
 
-  HashedWheelTimer getTimer() {
-    return timer;
-  }
-
-  ClientSocketChannelFactory getChannelFactory() {
-    return channelFactory;
-  }
-
-  SecurityContext getSecurityContext() {
-    return securityContext;
-  }
-
   /**
    * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
    * @param table the name of the table you intend to scan.
@@ -691,23 +720,20 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
     RemoteTablet tablet = scanner.currentTablet();
-    assert (tablet != null);
+    Preconditions.checkNotNull(tablet);
     KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
-    String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
-    TabletClient client = connectionCache.getClient(uuid);
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
     nextRequest.attempt++;
-    if (client == null || !client.isAlive()) {
-      // A null client means we either don't know about this tablet anymore (unlikely) or we
-      // couldn't find a leader (which could be triggered by a read timeout).
-      // We'll first delay the RPC in case things take some time to settle down, then retry.
-      Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid +
-          " will retry after a delay");
-      return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection());
+    if (info == null) {
+      return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(
+          String.format("No information on servers hosting tablet %s, will retry later",
+              tablet.getTabletId()))));
     }
+
     Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
-    client.sendRpc(nextRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
     return d;
   }
 
@@ -723,55 +749,19 @@ public class AsyncKuduClient implements AutoCloseable {
     if (tablet == null) {
       return Deferred.fromResult(null);
     }
-
-    final KuduRpc<AsyncKuduScanner.Response>  closeRequest = scanner.getCloseRequest();
-    final TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
-    if (client == null || !client.isAlive()) {
-      // Oops, we couldn't find a tablet server that hosts this tablet. Our
-      // cache was probably invalidated while the client was scanning. So
-      // we can't close this scanner properly.
-      LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
+    final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection());
+    if (info == null) {
       return Deferred.fromResult(null);
     }
 
     final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
     closeRequest.attempt++;
-    client.sendRpc(closeRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
     return d;
   }
 
   /**
-   * Forcefully shuts down the RemoteTablet connection and
-   * fails all outstanding RPCs.
-   *
-   * @param tablet the given tablet
-   * @param replicaSelection replica selection mechanism to use
-   */
-  @VisibleForTesting
-  void shutdownConnection(RemoteTablet tablet,
-                          ReplicaSelection replicaSelection) {
-    TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(replicaSelection));
-    client.shutdown();
-  }
-
-  /**
-   * Forcefully disconnects the RemoteTablet connection and
-   * fails all outstanding RPCs.
-   *
-   * @param tablet the given tablet
-   * @param replicaSelection replica selection mechanism to use
-   */
-  @VisibleForTesting
-  void disconnect(RemoteTablet tablet,
-                  ReplicaSelection replicaSelection) {
-    TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(replicaSelection));
-    client.disconnect();
-  }
-
-  /**
    * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
    * of the tablet identified by the RPC's table and partition key.
    *
@@ -812,15 +802,12 @@ public class AsyncKuduClient implements AutoCloseable {
     // If we found a tablet, we'll try to find the TS to talk to.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
-      if (uuid != null) {
+      ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection());
+      if (info != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);
-        TabletClient client = connectionCache.getLiveClient(uuid);
-        if (client != null) {
-          client.sendRpc(request);
-          return d;
-        }
+        RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+        return d;
       }
     }
 
@@ -923,7 +910,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param <R> Request's return type.
    * @return An errback.
    */
-  <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
+  private <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(
+      final KuduRpc<R> request) {
     return new Callback<Exception, Exception>() {
       @Override
       public Exception call(Exception e) throws Exception {
@@ -944,26 +932,26 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param errback the errback to call if something goes wrong when calling IsCreateTableDone
    * @return Deferred used to track the provided KuduRpc
    */
-  <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc,
-                                           final Callback<Deferred<R>,
-                                               Master.IsCreateTableDoneResponsePB> retryCB,
-                                           final Callback<Exception, Exception> errback) {
+  private <R> Deferred<R> delayedIsCreateTableDone(
+      final KuduTable table,
+      final KuduRpc<R> rpc,
+      final Callback<Deferred<R>,
+      Master.IsCreateTableDoneResponsePB> retryCB,
+      final Callback<Exception, Exception> errback) {
 
     final class RetryTimer implements TimerTask {
       public void run(final Timeout timeout) {
         String tableId = table.getTableId();
         final boolean has_permit = acquireMasterLookupPermit();
-        if (!has_permit) {
+        if (!has_permit && !tablesNotServed.contains(tableId)) {
           // If we failed to acquire a permit, it's worth checking if someone
           // looked up the tablet we're interested in.  Every once in a while
           // this will save us a Master lookup.
-          if (!tablesNotServed.contains(tableId)) {
-            try {
-              retryCB.call(null);
-              return;
-            } catch (Exception e) {
-              // we're calling RetryRpcCB which doesn't throw exceptions, ignore
-            }
+          try {
+            retryCB.call(null);
+            return;
+          } catch (Exception e) {
+            // we're calling RetryRpcCB which doesn't throw exceptions, ignore
           }
         }
         IsCreateTableDoneRequest isCreateTableDoneRequest =
@@ -1027,11 +1015,11 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  long getSleepTimeForRpc(KuduRpc<?> rpc) {
-    byte attemptCount = rpc.attempt;
+  private long getSleepTimeForRpc(KuduRpc<?> rpc) {
+    int attemptCount = rpc.attempt;
     assert (attemptCount > 0);
     if (attemptCount == 0) {
-      LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc,
+      LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: {}", rpc,
           new Exception("Exception created to collect stack trace"));
       attemptCount = 1;
     }
@@ -1039,29 +1027,12 @@ public class AsyncKuduClient implements AutoCloseable {
     long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) *
         sleepRandomizer.nextDouble());
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
+      LOG.trace("Going to sleep for {} at retry {}", sleepTime, rpc.attempt);
     }
     return sleepTime;
   }
 
   /**
-   * Modifying the list returned by this method won't change how AsyncKuduClient behaves,
-   * but calling certain methods on the returned TabletClients can. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return copy of the current TabletClients list
-   */
-  @VisibleForTesting
-  List<TabletClient> getTabletClients() {
-    return connectionCache.getImmutableTabletClientsList();
-  }
-
-  @VisibleForTesting
-  TabletClient getTabletClient(String uuid) {
-    return connectionCache.getClient(uuid);
-  }
-
-  /**
    * Clears {@link #tableLocations} of the table's entries.
    *
    * This method makes the maps momentarily inconsistent, and should only be
@@ -1080,7 +1051,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return {@code true} if this RPC already had too many attempts,
    * {@code false} otherwise (in which case it's OK to retry once more)
    */
-  static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
+  private static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
     return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
   }
 
@@ -1091,8 +1062,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param cause What was cause of the last failed attempt, if known.
    * You can pass {@code null} if the cause is unknown.
    */
-  static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
-                                                  final KuduException cause) {
+  private static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
+                                                          final KuduException cause) {
     String message;
     if (request.attempt > MAX_RPC_ATTEMPTS) {
       message = "Too many attempts: ";
@@ -1127,7 +1098,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // this will save us a Master lookup.
       TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
       if (entry != null && !entry.isNonCoveredRange() &&
-          entry.getTablet().getLeaderUUID() != null) {
+          entry.getTablet().getLeaderServerInfo() != null) {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
@@ -1164,9 +1135,8 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
     // TODO(todd): stop using this 'masterTable' hack.
-    return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache,
-        defaultAdminOperationTimeoutMs)
-        .addCallback(
+    return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
+        defaultAdminOperationTimeoutMs).addCallback(
             new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
               @Override
               public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
@@ -1306,8 +1276,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * We're handling a tablet server that's telling us it doesn't have the tablet we're asking for.
    * We're in the context of decode() meaning we need to either callback or retry later.
    */
-  <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    invalidateTabletCache(rpc.getTablet(), server);
+  <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+    invalidateTabletCache(rpc.getTablet(), info);
     handleRetryableError(rpc, ex);
   }
 
@@ -1315,8 +1285,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * A tablet server is letting us know that it isn't the specified tablet's leader in response
    * a RPC, so we need to demote it and retry.
    */
-  <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
+  <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+    rpc.getTablet().demoteLeader(info.getUuid());
     handleRetryableError(rpc, ex);
   }
 
@@ -1370,12 +1340,40 @@ public class AsyncKuduClient implements AutoCloseable {
    * Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing
    * the tablet itself from the caches.
    */
-  private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
-    String uuid = server.getServerInfo().getUuid();
+  private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info) {
+    final String uuid = info.getUuid();
     LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId());
     tablet.removeTabletClient(uuid);
   }
 
+  /**
+   * Translate master-provided information {@link Master.TSInfoPB} on a tablet server into internal
+   * {@link ServerInfo} representation.
+   *
+   * @param tsInfoPB master-provided information for the tablet server
+   * @return an object that contains all the server's information
+   * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+   */
+  private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
+    final List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
+    final String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
+    if (addresses.isEmpty()) {
+      LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
+      return null;
+    }
+
+    // from meta_cache.cc
+    // TODO: if the TS advertises multiple host/ports, pick the right one
+    // based on some kind of policy. For now just use the first always.
+    final HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
+    final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+    if (inetAddress == null) {
+      throw new UnknownHostException(
+          "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
+    }
+    return new ServerInfo(uuid, hostPort, inetAddress);
+  }
+
   /** Callback executed when a master lookup completes.  */
   private final class MasterLookupCB implements Callback<Object,
       Master.GetTableLocationsResponsePB> {
@@ -1418,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  boolean acquireMasterLookupPermit() {
+  private boolean acquireMasterLookupPermit() {
     try {
       // With such a low timeout, the JVM may chose to spin-wait instead of
       // de-scheduling the thread (and causing context switches and whatnot).
@@ -1433,7 +1431,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * Releases a master lookup permit that was acquired.
    * @see #acquireMasterLookupPermit
    */
-  void releaseMasterLookupPermit() {
+  private void releaseMasterLookupPermit() {
     masterLookups.release();
   }
 
@@ -1477,7 +1475,7 @@ public class AsyncKuduClient implements AutoCloseable {
       List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount());
       for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
         try {
-          ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo());
+          ServerInfo serverInfo = resolveTS(replica.getTsInfo());
           if (serverInfo != null) {
             servers.add(serverInfo);
           }
@@ -1508,7 +1506,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
     // sleep before retrying.
     TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
-    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
+    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) {
       throw new NoLeaderFoundException(
           Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
     }
@@ -1588,8 +1586,9 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Invokes {@link #shutdown()} and waits. This method returns
-   * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
+   * Invokes {@link #shutdown()} and waits. This method returns void, so consider invoking
+   * {@link #shutdown()} directly if there's a need to handle dangling RPCs.
+   *
    * @throws Exception if an error happens while closing the connections
    */
   @Override
@@ -1607,7 +1606,8 @@ public class AsyncKuduClient implements AutoCloseable {
    *   <li>Releases all other resources.</li>
    * </ul>
    * <strong>Not calling this method before losing the last reference to this
-   * instance may result in data loss and other unwanted side effects</strong>
+   * instance may result in data loss and other unwanted side effects.</strong>
+   *
    * @return A {@link Deferred}, whose callback chain will be invoked once all
    * of the above have been done. If this callback chain doesn't fail, then
    * the clean shutdown will be successful, and all the data will be safe on
@@ -1628,6 +1628,7 @@ public class AsyncKuduClient implements AutoCloseable {
         super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
       }
 
+      @Override
       public void run() {
         // This terminates the Executor.
         channelFactory.releaseExternalResources();
@@ -1676,7 +1677,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // concurrent modification during the iteration.
     Set<AsyncKuduSession> copyOfSessions;
     synchronized (sessions) {
-      copyOfSessions = new HashSet<AsyncKuduSession>(sessions);
+      copyOfSessions = new HashSet<>(sessions);
     }
     if (sessions.isEmpty()) {
       return Deferred.fromResult(null);
@@ -1690,7 +1691,7 @@ public class AsyncKuduClient implements AutoCloseable {
     return Deferred.group(deferreds);
   }
 
-  private boolean isMasterTable(String tableId) {
+  private static boolean isMasterTable(String tableId) {
     // Checking that it's the same instance so there's absolutely no chance of confusing the master
     // 'table' for a user one.
     return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
@@ -1709,6 +1710,14 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * @return copy of the current TabletClients list
+   */
+  @VisibleForTesting
+  List<Connection> getConnectionListCopy() {
+    return connectionCache.getConnectionListCopy();
+  }
+
+  /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 22b31f0..dfba55d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Callback;
@@ -80,7 +81,8 @@ final class ConnectToCluster {
 
   private static Deferred<ConnectToMasterResponsePB> connectToMaster(
       final KuduTable masterTable,
-      final TabletClient masterClient, KuduRpc<?> parentRpc,
+      final RpcProxy masterProxy,
+      KuduRpc<?> parentRpc,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
@@ -93,7 +95,7 @@ final class ConnectToCluster {
     }
     Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
     rpc.attempt++;
-    masterClient.sendRpc(rpc);
+    masterProxy.sendRpc(rpc);
 
     // If we are connecting to an older version of Kudu, we'll get an invalid request
     // error. In that case, we resend using the older version of the RPC.
@@ -107,10 +109,10 @@ final class ConnectToCluster {
               rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) {
             AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " +
                 "to server running Kudu < 1.3.");
-            Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred();
-            assert newAttempt != null;
+            final Deferred<ConnectToMasterResponsePB> newAttempt =
+                Preconditions.checkNotNull(rpc.getDeferred());
             rpc.setUseOldMethod();
-            masterClient.sendRpc(rpc);
+            masterProxy.sendRpc(rpc);
             return newAttempt;
           }
         }
@@ -128,8 +130,6 @@ final class ConnectToCluster {
    * @param masterTable the "placeholder" table used by AsyncKuduClient
    * @param masterAddresses the addresses of masters to fetch from
    * @param parentRpc RPC that prompted a master lookup, can be null
-   * @param connCache the client's connection cache, used for creating connections
-   *                  to masters
    * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
    * @return a Deferred object for the cluster connection status
    */
@@ -137,7 +137,6 @@ final class ConnectToCluster {
       KuduTable masterTable,
       List<HostAndPort> masterAddresses,
       KuduRpc<?> parentRpc,
-      ConnectionCache connCache,
       long defaultTimeoutMs) {
     ConnectToCluster connector = new ConnectToCluster(masterAddresses);
 
@@ -146,14 +145,14 @@ final class ConnectToCluster {
     // deferred.
     for (HostAndPort hostAndPort : masterAddresses) {
       Deferred<ConnectToMasterResponsePB> d;
-      TabletClient client = connCache.newMasterClient(hostAndPort);
-      if (client == null) {
+      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+      if (proxy != null) {
+        d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+      } else {
         String message = "Couldn't resolve this master's address " + hostAndPort.toString();
         LOG.warn(message);
         Status statusIOE = Status.IOError(message);
         d = Deferred.fromError(new NonRecoverableException(statusIOE));
-      } else {
-        d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs);
       }
       d.addCallbacks(connector.callbackForNode(hostAndPort),
           connector.errbackForNode(hostAndPort));
@@ -192,56 +191,50 @@ final class ConnectToCluster {
    * to responseD.
    */
   private void incrementCountAndCheckExhausted() {
-    if (countResponsesReceived.incrementAndGet() == numMasters) {
-      if (responseDCalled.compareAndSet(false, true)) {
-
-        // We want `allUnrecoverable` to only be true if all the masters came back with
-        // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
-        // that replies with RecoverableException or with an ok response but is a FOLLOWER is
-        // enough to keep us retrying.
-        boolean allUnrecoverable = true;
-        if (exceptionsReceived.size() == countResponsesReceived.get()) {
-          for (Exception ex : exceptionsReceived) {
-            if (!(ex instanceof NonRecoverableException)) {
-              allUnrecoverable = false;
-              break;
-            }
+    if (countResponsesReceived.incrementAndGet() == numMasters &&
+        responseDCalled.compareAndSet(false, true)) {
+      // We want `allUnrecoverable` to only be true if all the masters came back with
+      // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+      // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+      // enough to keep us retrying.
+      boolean allUnrecoverable = true;
+      if (exceptionsReceived.size() == countResponsesReceived.get()) {
+        for (Exception ex : exceptionsReceived) {
+          if (!(ex instanceof NonRecoverableException)) {
+            allUnrecoverable = false;
+            break;
           }
-        } else {
-          allUnrecoverable = false;
         }
+      } else {
+        allUnrecoverable = false;
+      }
 
-        String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
-        if (allUnrecoverable) {
-          // This will stop retries.
-          String msg = String.format("Couldn't find a valid master in (%s). " +
-              "Exceptions received: %s", allHosts,
-              Joiner.on(",").join(Lists.transform(
-                  exceptionsReceived, Functions.toStringFunction())));
-          Status s = Status.ServiceUnavailable(msg);
-          responseD.callback(new NonRecoverableException(s));
+      String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+      if (allUnrecoverable) {
+        // This will stop retries.
+        String msg = String.format("Couldn't find a valid master in (%s). " +
+            "Exceptions received: %s", allHosts,
+            Joiner.on(",").join(Lists.transform(
+                exceptionsReceived, Functions.toStringFunction())));
+        Status s = Status.ServiceUnavailable(msg);
+        responseD.callback(new NonRecoverableException(s));
+      } else {
+        String message = String.format("Master config (%s) has no leader.",
+            allHosts);
+        Exception ex;
+        if (exceptionsReceived.isEmpty()) {
+          LOG.warn("None of the provided masters {} is a leader; will retry", allHosts);
+          ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
         } else {
-          String message = String.format("Master config (%s) has no leader.",
-              allHosts);
-          Exception ex;
-          if (exceptionsReceived.isEmpty()) {
-            LOG.warn(String.format(
-                "None of the provided masters (%s) is a leader, will retry.",
-                allHosts));
-            ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
-          } else {
-            LOG.warn(String.format(
-                "Unable to find the leader master (%s), will retry",
-                allHosts));
-            String joinedMsg = message + " Exceptions received: " +
-                Joiner.on(",").join(Lists.transform(
-                    exceptionsReceived, Functions.toStringFunction()));
-            Status s = Status.ServiceUnavailable(joinedMsg);
-            ex = new NoLeaderFoundException(s,
-                exceptionsReceived.get(exceptionsReceived.size() - 1));
-          }
-          responseD.callback(ex);
+          LOG.warn("Unable to find the leader master {}; will retry", allHosts);
+          String joinedMsg = message + " Exceptions received: " +
+              Joiner.on(",").join(Lists.transform(
+                  exceptionsReceived, Functions.toStringFunction()));
+          Status s = Status.ServiceUnavailable(joinedMsg);
+          ex = new NoLeaderFoundException(s,
+              exceptionsReceived.get(exceptionsReceived.size() - 1));
         }
+        responseD.callback(ex);
       }
     }
   }
@@ -273,8 +266,7 @@ final class ConnectToCluster {
         // Someone else already found a leader. This is somewhat unexpected
         // because this means two nodes think they're the leader, but it's
         // not impossible. We'll just ignore it.
-        LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
-            hostAndPort.toString());
+        LOG.debug("Callback already invoked, discarding response({}) from {}", r, hostAndPort);
         return null;
       }
 
@@ -304,7 +296,7 @@ final class ConnectToCluster {
 
     @Override
     public Void call(Exception e) throws Exception {
-      LOG.warn("Error receiving a response from: " + hostAndPort, e);
+      LOG.warn("Error receiving response from {}", hostAndPort, e);
       exceptionsReceived.add(e);
       incrementCountAndCheckExhausted();
       return null;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
new file mode 100644
index 0000000..ea1e113
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.net.ssl.SSLException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+
+/**
+ * Class representing a connection from the client to a Kudu server (master or tablet server):
+ * a high-level wrapper for the TCP connection between the client and the server.
+ * <p>
+ * It's a stateful handler that manages a connection to a Kudu server.
+ * <p>
+ * This handler manages the RPC IDs, and keeps track of the RPCs in flight for which
+ * a response is currently awaited, as well as temporarily buffered RPCs that are waiting
+ * to be sent to the server.
+ * <p>
+ * Acquiring the monitor on an object of this class will prevent it from
+ * accepting write requests as well as buffering requests if the underlying
+ * channel isn't connected.
+ * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class Connection extends SimpleChannelUpstreamHandler {
+  /** Information on the target server. */
+  private final ServerInfo serverInfo;
+
+  /** Security context to use for connection negotiation. */
+  private final SecurityContext securityContext;
+
+  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+  private final long socketReadTimeoutMs;
+
+  /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+  private final HashedWheelTimer timer;
+
+  /** The underlying Netty's socket channel. */
+  private final SocketChannel channel;
+
+  /**
+   * Set to true when disconnect initiated explicitly from the client side. The channelDisconnected
+   * event handler then knows not to log any warning about unexpected disconnection from the peer.
+   */
+  private volatile boolean explicitlyDisconnected = false;
+
+  /** Logger: a sink for the log messages originated from this class. */
+  private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
+
+  private static final byte RPC_CURRENT_VERSION = 9;
+
+  /** Initial header sent by the client upon connection establishment. */
+  private static final byte[] CONNECTION_HEADER = new byte[]{'h', 'r', 'p', 'c',
+      RPC_CURRENT_VERSION,     // RPC version.
+      0,
+      0
+  };
+
+  /** Lock to guard access to some of the fields below. */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /** A state of this object. */
+  @GuardedBy("lock")
+  private State state;
+
+  /**
+   * A hash table to store { callId, statusReportCallback } pairs, representing messages which have
+   * already been sent and pending responses from the server side. Once the server responds to a
+   * message, the corresponding entry is removed from the container and the response callback
+   * is invoked with the results represented by {@link CallResponseInfo}.
+   */
+  @GuardedBy("lock")
+  private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap<>();
+
+  /** Messages enqueued while the connection was not ready to start sending them over the wire. */
+  @GuardedBy("lock")
+  private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
+
+  /** The result of the connection negotiation. */
+  @GuardedBy("lock")
+  private Result negotiationResult = null;
+
+  /** A monotonically increasing counter for RPC IDs. */
+  @GuardedBy("lock")
+  private int nextCallId = 0;
+
+  Connection(ServerInfo serverInfo,
+             SecurityContext securityContext,
+             long socketReadTimeoutMs,
+             HashedWheelTimer timer,
+             ClientSocketChannelFactory channelFactory) {
+    this.serverInfo = serverInfo;
+    this.securityContext = securityContext;
+    this.state = State.NEW;
+    this.socketReadTimeoutMs = socketReadTimeoutMs;
+    this.timer = timer;
+
+    final ConnectionPipeline pipeline = new ConnectionPipeline();
+    pipeline.init();
+
+    channel = channelFactory.newChannel(pipeline);
+    SocketChannelConfig config = channel.getConfig();
+    config.setConnectTimeoutMillis(60000);
+    config.setTcpNoDelay(true);
+    // Unfortunately there is no way to override the keep-alive timeout in
+    // Java since the JRE doesn't expose any way to call setsockopt() with
+    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+    config.setKeepAlive(true);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelConnected(final ChannelHandlerContext ctx,
+                               final ChannelStateEvent e) {
+    lock.lock();
+    try {
+      Preconditions.checkState(state == State.CONNECTING);
+      state = State.NEGOTIATING;
+    } finally {
+      lock.unlock();
+    }
+    Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+    Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
+    negotiator.sendHello(channel);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void handleUpstream(final ChannelHandlerContext ctx,
+                             final ChannelEvent e) throws Exception {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} upstream event {}", getLogPrefix(), e);
+    }
+    super.handleUpstream(ctx, e);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelDisconnected(final ChannelHandlerContext ctx,
+                                  final ChannelStateEvent e) throws Exception {
+    // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+    // pipeline after Connection itself. So, just handle the disconnection event ourselves.
+    cleanup("connection disconnected");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelClosed(final ChannelHandlerContext ctx,
+                            final ChannelStateEvent e) throws Exception {
+    // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+    // pipeline after Connection itself. So, just handle the close event ourselves.
+    cleanup("connection closed");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
+    Object m = evt.getMessage();
+    if (m instanceof Negotiator.Result) {
+      lock.lock();
+      try {
+        Preconditions.checkState(state == State.NEGOTIATING);
+        Preconditions.checkNotNull(queuedMessages);
+
+        state = State.READY;
+        negotiationResult = (Negotiator.Result) m;
+        List<QueuedMessage> queued = queuedMessages;
+        // The queuedMessages should not be used anymore once the connection is negotiated.
+        queuedMessages = null;
+        // Send out all the enqueued messages. This is done while holding the lock to preserve
+        // the sequence of the already enqueued and being-enqueued-right-now messages and simplify
+        // the logic which checks for the consistency using Preconditions.checkXxx() methods.
+        for (final QueuedMessage qm : queued) {
+          sendCallToWire(qm.message, qm.cb);
+        }
+      } finally {
+        lock.unlock();
+      }
+      return;
+    }
+
+    // Some other event which the connection does not handle.
+    if (!(m instanceof CallResponse)) {
+      ctx.sendUpstream(evt);
+      return;
+    }
+
+    final CallResponse response = (CallResponse) m;
+    final RpcHeader.ResponseHeader header = response.getHeader();
+    if (!header.hasCallId()) {
+      final int size = response.getTotalResponseSize();
+      final String msg = getLogPrefix() +
+          " RPC response (size: " + size + ") doesn't" + " have callID: " + header;
+      LOG.error(msg);
+      throw new NonRecoverableException(Status.Incomplete(msg));
+    }
+
+    final int callId = header.getCallId();
+    Callback<Void, CallResponseInfo> responseCbk;
+    lock.lock();
+    try {
+      Preconditions.checkState(state == State.READY);
+      responseCbk = inflightMessages.remove(callId);
+    } finally {
+      lock.unlock();
+    }
+
+    if (responseCbk == null) {
+      final String msg = getLogPrefix() + " invalid callID: " + callId;
+      LOG.error(msg);
+      // If we get a bad RPC ID back, we are probably somehow misaligned from
+      // the server. So, we disconnect the connection.
+      throw new NonRecoverableException(Status.IllegalState(msg));
+    }
+
+    if (!header.hasIsError() || !header.getIsError()) {
+      // The success case.
+      responseCbk.call(new CallResponseInfo(response, null));
+      return;
+    }
+
+    final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+    KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
+    final RpcHeader.ErrorStatusPB error = errorBuilder.build();
+    if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
+        error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
+      responseCbk.call(new CallResponseInfo(
+          response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
+      return;
+    }
+
+    final String message = getLogPrefix() + " server sent error " + error.getMessage();
+    LOG.error(message); // can be useful
+    responseCbk.call(new CallResponseInfo(
+        response, new RpcRemoteException(Status.RemoteError(message), error)));
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void exceptionCaught(final ChannelHandlerContext ctx,
+                              final ExceptionEvent event) {
+    final Throwable e = event.getCause();
+    final Channel c = event.getChannel();
+
+    if (e instanceof RejectedExecutionException) {
+      LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+    } else if (e instanceof ReadTimeoutException) {
+      LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+    } else if (e instanceof ClosedChannelException) {
+      if (!explicitlyDisconnected) {
+        LOG.info("{} lost connection to peer", getLogPrefix());
+      }
+    } else if (e instanceof SSLException && explicitlyDisconnected) {
+      // There's a race in Netty where, when we call Channel.close(), it tries
+      // to send a TLS 'shutdown' message and enters a shutdown state. If another
+      // thread races to send actual data on the channel, then Netty will get a
+      // bit confused that we are trying to send data and misinterpret it as a
+      // renegotiation attempt, and throw an SSLException. So, we just ignore any
+      // SSLException if we've already attempted to close.
+      LOG.debug("{} ignoring SSLException: already disconnected", getLogPrefix());
+    } else {
+      LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+    }
+    if (c.isOpen()) {
+      Channels.close(c);        // Will trigger channelClosed(), which will cleanup()
+    } else {                    // else: presumably a connection timeout.
+      cleanup(e.getMessage());  // => need to cleanup() from here directly.
+    }
+  }
+
+  /** Getter for the peer's end-point information */
+  public ServerInfo getServerInfo() {
+    return serverInfo;
+  }
+
+  /**
+   * @return true iff the connection is in the DISCONNECTED state
+   */
+  boolean isDisconnected() {
+    lock.lock();
+    try {
+      return state == State.DISCONNECTED;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * TODO(aserbin) make it possible to avoid calling this when the server features are not known yet
+   *
+   * @return the set of server's features, if known; null otherwise
+   */
+  @Nullable
+  Set<RpcFeatureFlag> getPeerFeatures() {
+    Set<RpcFeatureFlag> features = null;
+    lock.lock();
+    try {
+      if (negotiationResult != null) {
+        features = negotiationResult.serverFeatures;
+      }
+    } finally {
+      lock.unlock();
+    }
+    return features;
+  }
+
+  /**
+   * @return string representation of the peer (i.e. the server) information suitable for logging
+   */
+  String getLogPrefix() {
+    return "[peer " + serverInfo.getUuid() + "]";
+  }
+
+  /**
+   * Enqueue outbound message for sending to the remote server via Kudu RPC. The enqueueMessage()
+   * accepts messages even if the connection hasn't yet been established: the enqueued messages
+   * are sent out as soon as the connection to the server is ready. The connection is initiated upon
+   * enqueuing the very first outbound message.
+   */
+  void enqueueMessage(RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb)
+      throws RecoverableException {
+    lock.lock();
+    try {
+      if (state == State.DISCONNECTED) {
+        // The upper-level caller should handle the exception and retry using a new connection.
+        throw new RecoverableException(Status.IllegalState(
+            "connection in DISCONNECTED state; cannot enqueue a message"));
+      }
+
+      if (state == State.NEW) {
+        // Schedule connecting to the server.
+        connect();
+      }
+
+      // Set the call identifier for the outgoing RPC.
+      final int callId = nextCallId++;
+      RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder();
+      headerBuilder.setCallId(callId);
+
+      // Amend the timeout for the call, if necessary.
+      if (socketReadTimeoutMs > 0) {
+        final int timeoutMs = headerBuilder.getTimeoutMillis();
+        if (timeoutMs > 0) {
+          headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
+        }
+      }
+
+      // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
+      // The elements of the queuedMessages list will be processed when the negotiation either
+      // succeeds or fails.
+      if (state != State.READY) {
+        queuedMessages.add(new QueuedMessage(msg, cb));
+        return;
+      }
+
+      // It's time to initiate sending the message over the wire.
+      sendCallToWire(msg, cb);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Triggers the channel to be disconnected, which will asynchronously cause all
+   * queued and in-flight RPCs to be failed. This method is idempotent.
+   *
+   * @return future object to wait on the disconnect completion, if necessary
+   */
+  ChannelFuture disconnect() {
+    explicitlyDisconnected = true;
+    return Channels.disconnect(channel);
+  }
+
+  /**
+   * If open, forcefully shut down the connection to the server. This is the same as
+   * {@link #disconnect}, but it returns Deferred instead of ChannelFuture.
+   *
+   * @return deferred object for tracking the shutting down of this connection
+   */
+  Deferred<Void> shutdown() {
+    final ChannelFuture disconnectFuture = disconnect();
+    final Deferred<Void> d = new Deferred<>();
+    disconnectFuture.addListener(new ChannelFutureListener() {
+      public void operationComplete(final ChannelFuture future) {
+        if (future.isSuccess()) {
+          d.callback(null);
+          return;
+        }
+        final Throwable t = future.getCause();
+        if (t instanceof Exception) {
+          d.callback(t);
+        } else {
+          d.callback(new NonRecoverableException(
+              Status.IllegalState("failed to shutdown: " + this), t));
+        }
+      }
+    });
+    return d;
+  }
+
+  /**
+   * @return string representation of this object (suitable for printing into the logs, etc.)
+   */
+  public String toString() {
+    final StringBuilder buf = new StringBuilder();
+    buf.append("Connection@")
+        .append(hashCode())
+        .append("(channel=")
+        .append(channel)
+        .append(", uuid=")
+        .append(serverInfo.getUuid());
+    int queuedMessagesNum = 0;
+    int inflightMessagesNum = 0;
+    lock.lock();
+    try {
+      queuedMessagesNum = queuedMessages == null ? 0 : queuedMessages.size();
+      inflightMessagesNum = inflightMessages == null ? 0 : inflightMessages.size();
+    } finally {
+      lock.unlock();
+    }
+    buf.append(", #queued=").append(queuedMessagesNum)
+        .append(", #inflight=").append(inflightMessagesNum)
+        .append(")");
+    return buf.toString();
+  }
+
+  /**
+   * This is test-only method.
+   *
+   * @return true iff the connection is in the READY state
+   */
+  @VisibleForTesting
+  boolean isReady() {
+    lock.lock();
+    try {
+      return state == State.READY;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /** Start sending the message to the server over the wire. */
+  @GuardedBy("lock")
+  private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    Preconditions.checkState(state == State.READY);
+    Preconditions.checkNotNull(inflightMessages);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} sending {}", getLogPrefix(), msg);
+    }
+    final int callId = msg.getHeaderBuilder().getCallId();
+    final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
+    Preconditions.checkArgument(empty == null);
+    Channels.write(channel, msg);
+  }
+
+  /**
+   * Process the fact that the connection has been disconnected: update the state of this object and
+   * clean up any outstanding or lingering messages, notifying on the error via their status
+   * callbacks. The callee is supposed to handle the error and retry sending the messages,
+   * if needed.
+   *
+   * @param errorMessage a string to describe the cause of the cleanup
+   */
+  private void cleanup(final String errorMessage) {
+    List<QueuedMessage> queued;
+    Map<Integer, Callback<Void, CallResponseInfo>> inflight;
+
+    lock.lock();
+    try {
+      if (state == State.DISCONNECTED) {
+        Preconditions.checkState(queuedMessages == null);
+        Preconditions.checkState(inflightMessages == null);
+        return;
+      }
+
+      queued = queuedMessages;
+      queuedMessages = null;
+
+      inflight = inflightMessages;
+      inflightMessages = null;
+
+      state = State.DISCONNECTED;
+    } finally {
+      lock.unlock();
+    }
+    final Status error = Status.NetworkError(getLogPrefix() + " " +
+        (errorMessage == null ? "connection reset" : errorMessage));
+    final RecoverableException exception = new RecoverableException(error);
+
+    for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
+      try {
+        cb.call(new CallResponseInfo(null, exception));
+      } catch (Exception e) {
+        LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
+      }
+    }
+
+    if (queued != null) {
+      for (QueuedMessage qm : queued) {
+        try {
+          qm.cb.call(new CallResponseInfo(null, exception));
+        } catch (Exception e) {
+          LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
+        }
+      }
+    }
+  }
+
+  /** Initiate opening TCP connection to the server. */
+  private void connect() {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    Preconditions.checkState(state == State.NEW);
+    state = State.CONNECTING;
+    channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
+  }
+
+  /** State of the Connection object. */
+  private enum State {
+    NEW,          // The object has just been created.
+    CONNECTING,   // The establishment of TCP connection to the server has started.
+    NEGOTIATING,  // The connection negotiation has started.
+    READY,        // The connection to the server has been opened, negotiated, and ready to use.
+    DISCONNECTED, // The TCP connection has been dropped off.
+  }
+
+  /**
+   * The class to represent RPC response received from the remote server.
+   * If the {@code exception} is null, then it's a success case and the {@code response} contains
+   * the information on the response. Otherwise it's an error and the {@code exception} provides
+   * information on the error. For the recoverable error case, the {@code exception} is of
+   * {@link RecoverableException} type, otherwise it's of {@link NonRecoverableException} type.
+   */
+  static final class CallResponseInfo {
+    public final CallResponse response;
+    public final KuduException exception;
+
+    CallResponseInfo(CallResponse response, KuduException exception) {
+      this.response = response;
+      this.exception = exception;
+    }
+  }
+
+  /** Internal class representing an enqueued outgoing message. */
+  private static final class QueuedMessage {
+    private final RpcOutboundMessage message;
+    private final Callback<Void, CallResponseInfo> cb;
+
+    QueuedMessage(RpcOutboundMessage message, Callback<Void, CallResponseInfo> cb) {
+      this.message = message;
+      this.cb = cb;
+    }
+  }
+
+  /** The helper class to build the Netty's connection pipeline. */
+  private final class ConnectionPipeline extends DefaultChannelPipeline {
+    void init() {
+      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+          KuduRpc.MAX_RPC_SIZE,
+          0, // length comes at offset 0
+          4, // length prefix is 4 bytes long
+          0, // no "length adjustment"
+          4 /* strip the length prefix */));
+      super.addLast("decode-inbound", new CallResponse.Decoder());
+      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+      if (Connection.this.socketReadTimeoutMs > 0) {
+        super.addLast("timeout-handler", new ReadTimeoutHandler(
+            Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
+      }
+      super.addLast("kudu-handler", Connection.this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 83ae8f0..844e79f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -17,263 +17,126 @@
 
 package org.apache.kudu.client;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.Common;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
 
 /**
- * The ConnectionCache is responsible for managing connections to masters and tablet servers.
- * There should only be one instance per Kudu client, and can <strong>not</strong> be shared between
- * clients.
- * <p>
- * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by
- * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues.
- * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes
- * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's
- * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead
- * connection prevents tight looping when hitting "Connection refused"-type of errors.
+ * The ConnectionCache is responsible for managing connections to Kudu masters and tablet servers.
+ * There should only be one instance of ConnectionCache per Kudu client, and it should not be
+ * shared between clients.
  * <p>
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
+ * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
+ * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
+ * to encounter memory issues.
+ *
  * This class is thread-safe.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class ConnectionCache {
+  /** Security context to use for connection negotiation. */
+  private final SecurityContext securityContext;
 
-  private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
+  /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
+  private final long socketReadTimeoutMs;
 
-  /**
-   * Cache that maps UUIDs to the clients connected to them.
-   * <p>
-   * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put,
-   * {@code putIfAbsent} isn't a good fit for us since it requires creating
-   * an object that may be "wasted" in case another thread wins the insertion
-   * race, and we don't want to create unnecessary connections.
-   */
-  @GuardedBy("lock")
-  private final HashMap<String, TabletClient> uuid2client = new HashMap<>();
+  /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
+  private final HashedWheelTimer timer;
 
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  /** Netty's channel factory to use by connections. */
+  private final ClientSocketChannelFactory channelFactory;
 
-  private final Lock readLock = lock.readLock();
-  private final Lock writeLock = lock.readLock();
+  /** Synchronization primitive to guard access to the fields below. */
+  private final ReentrantLock lock = new ReentrantLock();
 
-  private final AsyncKuduClient kuduClient;
+  @GuardedBy("lock")
+  private final HashMap<String, Connection> uuid2connection = new HashMap<>();
 
   /**
-   * Create a new empty ConnectionCache that will used the passed client to create connections.
-   * @param client a client that contains the information we need to create connections
+   * Create a new empty ConnectionCache given the specified parameters.
    */
-  ConnectionCache(AsyncKuduClient client) {
-    this.kuduClient = client;
+  ConnectionCache(SecurityContext securityContext,
+                  long socketReadTimeoutMs,
+                  HashedWheelTimer timer,
+                  ClientSocketChannelFactory channelFactory) {
+    this.securityContext = securityContext;
+    this.socketReadTimeoutMs = socketReadTimeoutMs;
+    this.timer = timer;
+    this.channelFactory = channelFactory;
   }
 
   /**
-   * Create a connection to a tablet server based on information provided by the master.
-   * @param tsInfoPB master-provided information for the tablet server
-   * @return an object that contains all the server's information
-   * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+   * Get connection to the specified server. If no connection exists or the existing connection
+   * is already disconnected, then create a new connection to the specified server. The newly
+   * created connection is not negotiated until enqueuing the first RPC to the target server.
+   *
+   * @param serverInfo the server end-point to connect to
+   * @return instance of this object with the specified destination
    */
-  ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
-    List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
-    String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
-    if (addresses.isEmpty()) {
-      LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
-      return null;
-    }
-
-    // from meta_cache.cc
-    // TODO: if the TS advertises multiple host/ports, pick the right one
-    // based on some kind of policy. For now just use the first always.
-    HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
-    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-    if (inetAddress == null) {
-      throw new UnknownHostException(
-          "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
-    }
-    return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo();
-  }
-
-  TabletClient newMasterClient(HostAndPort hostPort) {
-    // We should pass a UUID here but we have a chicken and egg problem, we first need to
-    // communicate with the masters to find out about them, and that's what we're trying to do.
-    // The UUID is just used for logging and cache key, so instead we just use a constructed
-    // string with the master host and port as.
-    return newClient("master-" + hostPort.toString(), hostPort);
-  }
+  public Connection getConnection(final ServerInfo serverInfo) {
+    Connection connection;
 
-  TabletClient newClient(String uuid, HostAndPort hostPort) {
-    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-    if (inetAddress == null) {
-      // TODO(todd): should we log the resolution failure? throw an exception?
-      return null;
-    }
-
-    ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress);
-    return newClient(serverInfo);
-  }
-
-  TabletClient newClient(ServerInfo serverInfo) {
-    TabletClient client;
-    SocketChannel chan;
-
-    writeLock.lock();
+    lock.lock();
     try {
-      client = uuid2client.get(serverInfo.getUuid());
-      if (client != null && client.isAlive()) {
-        return client;
+      // First try to find an existing connection.
+      connection = uuid2connection.get(serverInfo.getUuid());
+      if (connection == null || connection.isDisconnected()) {
+        // If no valid connection is found, create a new one.
+        connection = new Connection(serverInfo, securityContext,
+            socketReadTimeoutMs, timer, channelFactory);
+        uuid2connection.put(serverInfo.getUuid(), connection);
       }
-      final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(serverInfo);
-      chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
-      uuid2client.put(serverInfo.getUuid(), client);
-    } finally {
-      writeLock.unlock();
-    }
-
-    final SocketChannelConfig config = chan.getConfig();
-    config.setConnectTimeoutMillis(5000);
-    config.setTcpNoDelay(true);
-    // Unfortunately there is no way to override the keep-alive timeout in
-    // Java since the JRE doesn't expose any way to call setsockopt() with
-    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
-    config.setKeepAlive(true);
-    chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(),
-                                       serverInfo.getPort())); // Won't block.
-    return client;
-  }
-
-  /**
-   * Get a connection to a server for the given UUID. The returned connection can be down and its
-   * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's
-   * gonna be re-connected automatically, use {@link #getLiveClient(String)}.
-   * @param uuid server's identifier
-   * @return a connection to a server, or null if the passed UUID isn't known
-   */
-  TabletClient getClient(String uuid) {
-    readLock.lock();
-    try {
-      return uuid2client.get(uuid);
     } finally {
-      readLock.unlock();
+      lock.unlock();
     }
-  }
 
-  /**
-   * Get a connection to a server for the given UUID. This method will automatically call
-   * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
-   * @param uuid server's identifier
-   * @return a connection to a server, or null if the passed UUID isn't known
-   */
-  TabletClient getLiveClient(String uuid) {
-    TabletClient client = getClient(uuid);
-
-    if (client == null) {
-      return null;
-    } else if (client.isAlive()) {
-      return client;
-    } else {
-      return newClient(client.getServerInfo());
-    }
+    return connection;
   }
 
   /**
-   * Asynchronously closes every socket, which will also cancel all the RPCs in flight.
+   * Asynchronously terminate every connection. This also cancels all the pending and in-flight
+   * RPCs.
    */
   Deferred<ArrayList<Void>> disconnectEverything() {
-    readLock.lock();
+    lock.lock();
     try {
-      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2client.size());
-      for (TabletClient ts : uuid2client.values()) {
-        deferreds.add(ts.shutdown());
+      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+      for (Connection c : uuid2connection.values()) {
+        deferreds.add(c.shutdown());
       }
       return Deferred.group(deferreds);
     } finally {
-      readLock.unlock();
+      lock.unlock();
     }
   }
 
   /**
-   * The list returned by this method can't be modified,
-   * but calling certain methods on the returned TabletClients can have an effect. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return copy of the current TabletClients list
-   */
-  List<TabletClient> getImmutableTabletClientsList() {
-    readLock.lock();
-    try {
-      return ImmutableList.copyOf(uuid2client.values());
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /**
-   * Queries all the cached connections if they are alive.
-   * @return true if all the connections are down, else false
+   * Return a copy of the all-connections-list. This method is exposed only to allow
+   * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+   * of the cache.
+   *
+   * @return a copy of the list of all connections in the connection cache
    */
   @VisibleForTesting
-  boolean allConnectionsAreDead() {
-    readLock.lock();
+  List<Connection> getConnectionListCopy() {
+    lock.lock();
     try {
-      for (TabletClient tserver : uuid2client.values()) {
-        if (tserver.isAlive()) {
-          return false;
-        }
-      }
+      return ImmutableList.copyOf(uuid2connection.values());
     } finally {
-      readLock.unlock();
-    }
-    return true;
-  }
-
-  private final class TabletClientPipeline extends DefaultChannelPipeline {
-    TabletClient init(ServerInfo serverInfo) {
-      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
-          KuduRpc.MAX_RPC_SIZE,
-          0, // length comes at offset 0
-          4, // length prefix is 4 bytes long
-          0, // no "length adjustment"
-          4 /* strip the length prefix */));
-      super.addLast("decode-inbound", new CallResponse.Decoder());
-      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
-      AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
-      final TabletClient client = new TabletClient(kuduClient, serverInfo);
-      if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
-        super.addLast("timeout-handler",
-            new ReadTimeoutHandler(kuduClient.getTimer(),
-                kuduClient.getDefaultSocketReadTimeoutMs(),
-                TimeUnit.MILLISECONDS));
-      }
-      super.addLast("kudu-handler", client);
-
-      return client;
+      lock.unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 30f1a9c..0de894f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -131,13 +131,13 @@ public abstract class KuduRpc<R> {
    * that access this attribute will have a happens-before relationship with
    * the rest of the code, due to other existing synchronization.
    */
-  byte attempt;  // package-private for TabletClient and AsyncKuduClient only.
+  int attempt;  // package-private for RpcProxy and AsyncKuduClient only.
 
   /**
-   * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+   * Set by RpcProxy when isRequestTracked returns true to identify this RPC in the sequence of
    * RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
    */
-  long sequenceId = RequestTracker.NO_SEQ_NO;
+  private long sequenceId = RequestTracker.NO_SEQ_NO;
 
   KuduRpc(KuduTable table) {
     this.table = table;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 46d99db..a917af2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -147,9 +147,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    */
   private DecoderEmbedder<ChannelBuffer> sslEmbedder;
 
-  /** True if we have negotiated TLS with the server */
-  private boolean negotiatedTls;
-
   /**
    * The nonce sent from the server to the client, or null if negotiation has
    * not yet taken place, or the server does not send a nonce.
@@ -290,7 +287,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     // Store the supported features advertised by the server.
     serverFeatures = getFeatureFlags(response);
     // If the server supports TLS, we will always speak TLS to it.
-    negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+    final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
 
     // Check the negotiated authentication type sent by the server.
     chosenAuthnType = chooseAuthenticationType(response);

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index d4702f8..a6025f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.base.Objects;
@@ -38,7 +39,7 @@ import org.apache.kudu.master.Master;
  * This class encapsulates the information regarding a tablet and its locations.
  * <p>
  * RemoteTablet's main function is to keep track of where the leader for this
- * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
+ * tablet is. For example, an RPC might call {@link #getLeaderServerInfo()}, contact that TS, find
  * it's not the leader anymore, and then call {@link #demoteLeader(String)}.
  * <p>
  * A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
@@ -138,53 +139,61 @@ class RemoteTablet implements Comparable<RemoteTablet> {
   }
 
   /**
-   * Gets the UUID of the tablet server that we think holds the leader replica for this tablet.
-   * @return a UUID of a tablet server that we think has the leader, else null
+   * Get the information on the tablet server that we think holds the leader replica for this
+   * tablet.
+   *
+   * @return information on a tablet server that we think has the leader, else null
    */
-  String getLeaderUUID() {
+  @Nullable
+  ServerInfo getLeaderServerInfo() {
     synchronized (tabletServers) {
-      return leaderUuid;
+      return tabletServers.get(leaderUuid);
     }
   }
 
   /**
-   * Gets the UUID of the closest server. If none is closer than the others, returns a random
-   * server UUID.
-   * @return the UUID of the closest server, which might be any if none is closer, or null if this
-   *         cache doesn't know of any servers
+   * Get the information on the closest server. If none is closer than the others,
+   * return the information on a randomly picked server.
+   *
+   * @return the information on the closest server, which might be any if none is closer, or null
+   *   if this cache doesn't know any servers.
    */
-  String getClosestUUID() {
+  @Nullable
+  ServerInfo getClosestServerInfo() {
     synchronized (tabletServers) {
-      String lastUuid = null;
-      for (ServerInfo serverInfo : tabletServers.values()) {
-        lastUuid = serverInfo.getUuid();
-        if (serverInfo.isLocal()) {
-          return serverInfo.getUuid();
+      ServerInfo last = null;
+      for (ServerInfo e : tabletServers.values()) {
+        last = e;
+        if (e.isLocal()) {
+          return e;
         }
       }
-      return lastUuid;
+      return last;
     }
   }
 
   /**
    * Helper function to centralize the calling of methods based on the passed replica selection
    * mechanism.
+   *
    * @param replicaSelection replica selection mechanism to use
-   * @return a UUID for the server that matches the selection, can be null
+   * @return information on the server that matches the selection, can be null
    */
-  String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+  @Nullable
+  ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) {
     switch (replicaSelection) {
       case LEADER_ONLY:
-        return getLeaderUUID();
+        return getLeaderServerInfo();
       case CLOSEST_REPLICA:
-        return getClosestUUID();
+        return getClosestServerInfo();
       default:
-        throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+        throw new RuntimeException("unknown replica selection mechanism " + replicaSelection);
     }
   }
 
   /**
-   * Gets the replicas of this tablet. The returned list may not be mutated.
+   * Get replicas of this tablet. The returned list may not be mutated.
+   *
    * @return the replicas of the tablet
    */
   List<LocatedTablet.Replica> getReplicas() {