You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/05 20:59:33 UTC

[1/2] kudu git commit: [java] separating Connection

Repository: kudu
Updated Branches:
  refs/heads/master 7e7452704 -> 58248841f


http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
new file mode 100644
index 0000000..3f8de9b
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -0,0 +1,406 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.apache.kudu.client;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import com.stumbleupon.async.Callback;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.tserver.Tserver;
+import org.apache.kudu.util.Pair;
+
+
+/**
+ * This is a 'stateless' helper to send RPCs to a Kudu server ('stateless' in the sense that it
+ * does not keep any state itself besides the references to the {@link AsyncKuduClient} and
+ * {@link Connection} objects.
+ * <p>
+ * This helper serializes and de-serializes RPC requests and responses and provides handy
+ * methods to send the serialized RPC to the underlying {@link Connection} and to handle the
+ * response from it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RpcProxy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
+
+  /** The reference to the top-level Kudu client object. */
+  private final AsyncKuduClient client;
+
+  /** The reference to the object representing connection to the target server. */
+  private final Connection connection;
+
+  /**
+   * Construct RpcProxy object.
+   *
+   * @param client top-level Kudu client object
+   * @param connection the connection associated with the target Kudu server
+   */
+  RpcProxy(AsyncKuduClient client, Connection connection) {
+    Preconditions.checkNotNull(client);
+    Preconditions.checkNotNull(connection);
+    this.client = client;
+    this.connection = connection;
+  }
+
+  /**
+   * Send the specified RPC using the connection to the Kudu server.
+   *
+   * @param <R> type of the RPC
+   * @param rpc the RPC to send over the connection
+   */
+  <R> void sendRpc(final KuduRpc<R> rpc) {
+    sendRpc(client, connection, rpc);
+  }
+
+  /**
+   * Send the specified RPC using the connection to the Kudu server.
+   *
+   * @param <R> type of the RPC
+   * @param client client object to handle response and sending retries, if needed
+   * @param connection connection to send the request over
+   * @param rpc the RPC to send over the connection
+   */
+  static <R> void sendRpc(final AsyncKuduClient client,
+                          final Connection connection,
+                          final KuduRpc<R> rpc) {
+    try {
+      if (!rpc.getRequiredFeatures().isEmpty()) {
+        // An extra optimization: when the peer's features are already known, check that the server
+        // supports feature flags, if those are required.
+        Set<RpcFeatureFlag> features = connection.getPeerFeatures();
+        if (features != null &&
+            !features.contains(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
+          throw new NonRecoverableException(Status.NotSupported(
+              "the server does not support the APPLICATION_FEATURE_FLAGS RPC feature"));
+        }
+      }
+
+      Preconditions.checkArgument(rpc.hasDeferred());
+      rpc.addTrace(
+          new RpcTraceFrame.RpcTraceFrameBuilder(
+              rpc.method(),
+              RpcTraceFrame.Action.SEND_TO_SERVER)
+              .serverInfo(connection.getServerInfo())
+              .build());
+
+      if (!rpc.deadlineTracker.hasDeadline()) {
+        LOG.warn("{} sending RPC with no timeout {}", connection.getLogPrefix(), rpc);
+      }
+      connection.enqueueMessage(rpcToMessage(client, rpc),
+          new Callback<Void, Connection.CallResponseInfo>() {
+            @Override
+            public Void call(Connection.CallResponseInfo callResponseInfo) throws Exception {
+              try {
+                responseReceived(client, connection, rpc,
+                    callResponseInfo.response, callResponseInfo.exception);
+              } catch (Exception e) {
+                rpc.errback(e);
+              }
+              return null;
+            }
+          });
+    } catch (RecoverableException e) {
+      // This is to handle RecoverableException(Status.IllegalState()) from
+      // Connection.enqueueMessage() if the connection turned into the DISCONNECTED state.
+      client.handleRetryableError(rpc, e);
+    } catch (Exception e) {
+      rpc.errback(e);
+    }
+  }
+
+  /**
+   * Build {@link RpcOutboundMessage} out from {@link KuduRpc}.
+   *
+   * @param <R> type of the RPC
+   * @param client client object to handle response and sending retries, if needed
+   * @param rpc the RPC to convert into outbound message
+   * @return the result {@link RpcOutboundMessage}
+   */
+  private static <R> RpcOutboundMessage rpcToMessage(
+      final AsyncKuduClient client,
+      final KuduRpc<R> rpc) {
+    // The callId is set by Connection.enqueueMessage().
+    final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
+        .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
+        .setRemoteMethod(
+            RpcHeader.RemoteMethodPB.newBuilder()
+                .setServiceName(rpc.serviceName())
+                .setMethodName(rpc.method()));
+    final Message reqPB = rpc.createRequestPB();
+
+    if (rpc.deadlineTracker.hasDeadline()) {
+      headerBuilder.setTimeoutMillis((int) rpc.deadlineTracker.getMillisBeforeDeadline());
+    }
+
+    if (rpc.isRequestTracked()) {
+      RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
+      final RequestTracker requestTracker = client.getRequestTracker();
+      if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
+        rpc.setSequenceId(requestTracker.newSeqNo());
+      }
+      requestIdBuilder.setClientId(requestTracker.getClientId());
+      requestIdBuilder.setSeqNo(rpc.getSequenceId());
+      requestIdBuilder.setAttemptNo(rpc.attempt);
+      requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
+      headerBuilder.setRequestId(requestIdBuilder);
+    }
+
+    return new RpcOutboundMessage(headerBuilder, reqPB);
+  }
+
+  private static <R> void responseReceived(AsyncKuduClient client,
+                                           Connection connection,
+                                           final KuduRpc<R> rpc,
+                                           CallResponse response,
+                                           KuduException ex) {
+    Preconditions.checkNotNull(rpc);
+
+    final long start = System.nanoTime();
+    if (LOG.isTraceEnabled()) {
+      if (response == null) {
+        LOG.trace("{} received null response for RPC {}",
+            connection.getLogPrefix(), rpc);
+      } else {
+        RpcHeader.ResponseHeader header = response.getHeader();
+        Preconditions.checkNotNull(header);
+        LOG.trace("{} received response with rpcId {}, size {} for RPC {}",
+            connection.getLogPrefix(), header.getCallId(),
+            response.getTotalResponseSize(), rpc);
+      }
+    }
+
+    RpcTraceFrame.RpcTraceFrameBuilder traceBuilder = new RpcTraceFrame.RpcTraceFrameBuilder(
+        rpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(
+            connection.getServerInfo());
+    if (ex != null) {
+      if (ex instanceof RecoverableException) {
+        // This check is specifically for the ERROR_SERVER_TOO_BUSY, ERROR_UNAVAILABLE and alike.
+        failOrRetryRpc(client, connection, rpc, (RecoverableException) ex);
+        return;
+      }
+      rpc.addTrace(traceBuilder.callStatus(ex.getStatus()).build());
+      rpc.errback(ex);
+      return;
+    }
+
+    Pair<R, Object> decoded = null;
+    KuduException exception = null;
+    try {
+      decoded = rpc.deserialize(response, connection.getServerInfo().getUuid());
+    } catch (KuduException e) {
+      exception = e;
+    } catch (Exception e) {
+      rpc.addTrace(traceBuilder.build());
+      rpc.errback(e);
+      return;
+    }
+
+    // We can get this Message from within the RPC's expected type,
+    // so convert it into an exception and nullify decoded so that we use the errback route.
+    // Have to do it for both TS and Master errors.
+    if (decoded != null) {
+      if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
+        Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) decoded.getSecond();
+        exception = dispatchTSError(client, connection, rpc, error, traceBuilder);
+        if (exception == null) {
+          // It was taken care of.
+          return;
+        } else {
+          // We're going to errback.
+          decoded = null;
+        }
+      } else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
+        Master.MasterErrorPB error = (Master.MasterErrorPB) decoded.getSecond();
+        exception = dispatchMasterError(client, connection, rpc, error, traceBuilder);
+        if (exception == null) {
+          // Exception was taken care of.
+          return;
+        } else {
+          decoded = null;
+        }
+      }
+    }
+
+    try {
+      if (decoded != null) {
+        Preconditions.checkState(!(decoded.getFirst() instanceof Exception));
+        if (client.isStatisticsEnabled()) {
+          rpc.updateStatistics(client.getStatistics(), decoded.getFirst());
+        }
+        rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
+        rpc.callback(decoded.getFirst());
+      } else {
+        if (client.isStatisticsEnabled()) {
+          rpc.updateStatistics(client.getStatistics(), null);
+        }
+        rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
+        rpc.errback(exception);
+      }
+    } catch (Exception e) {
+      RpcHeader.ResponseHeader header = response.getHeader();
+      Preconditions.checkNotNull(header);
+      LOG.debug("{} unexpected exception {} while handling call: callId {}, RPC {}",
+          connection.getLogPrefix(), e, header.getCallId(), rpc);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("------------------<< LEAVING  DECODE <<------------------" +
+          " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
+    }
+  }
+
+  /**
+   * Takes care of a few kinds of TS errors that we handle differently, like tablets or leaders
+   * moving. Builds and returns an exception if we don't know what to do with it.
+   *
+   * @param client client object to handle response and sending retries, if needed
+   * @param connection connection to send the request over
+   * @param rpc   the original RPC call that triggered the error
+   * @param error the error the TS sent
+   * @param tracer RPC trace builder to add a record on the error into the call history
+   * @return an exception if we couldn't dispatch the error, or null
+   */
+  private static KuduException dispatchTSError(AsyncKuduClient client,
+                                               Connection connection,
+                                               KuduRpc<?> rpc,
+                                               Tserver.TabletServerErrorPB error,
+                                               RpcTraceFrame.RpcTraceFrameBuilder tracer) {
+    Tserver.TabletServerErrorPB.Code errCode = error.getCode();
+    WireProtocol.AppStatusPB.ErrorCode errStatusCode = error.getStatus().getCode();
+    Status status = Status.fromTabletServerErrorPB(error);
+    if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
+      client.handleTabletNotFound(
+          rpc, new RecoverableException(status), connection.getServerInfo());
+      // we're not calling rpc.callback() so we rely on the client to retry that RPC
+    } else if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_RUNNING ||
+        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+      client.handleRetryableError(rpc, new RecoverableException(status));
+      // The following two error codes are an indication that the tablet isn't a leader.
+    } else if (errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
+        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
+      client.handleNotLeader(rpc, new RecoverableException(status), connection.getServerInfo());
+    } else {
+      return new NonRecoverableException(status);
+    }
+    rpc.addTrace(tracer.callStatus(status).build());
+    return null;
+  }
+
+  /**
+   * Provides different handling for various kinds of master errors: re-uses the
+   * mechanisms already in place for handling tablet server errors as much as possible.
+   *
+   * @param client client object to handle response and sending retries, if needed
+   * @param connection connection to send the request over
+   * @param rpc   the original RPC call that triggered the error
+   * @param error the error the master sent
+   * @param tracer RPC trace builder to add a record on the error into the call history
+   * @return an exception if we couldn't dispatch the error, or null
+   */
+  private static KuduException dispatchMasterError(AsyncKuduClient client,
+                                                   Connection connection,
+                                                   KuduRpc<?> rpc,
+                                                   Master.MasterErrorPB error,
+                                                   RpcTraceFrame.RpcTraceFrameBuilder tracer) {
+
+    WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
+    Status status = Status.fromMasterErrorPB(error);
+    if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
+      client.handleNotLeader(rpc, new RecoverableException(status), connection.getServerInfo());
+    } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+      if (rpc instanceof ConnectToMasterRequest) {
+        // Special case:
+        // We never want to retry this RPC, we only use it to poke masters to learn where the leader
+        // is. If the error is truly non recoverable, it'll be handled later.
+        return new RecoverableException(status);
+      } else {
+        // TODO: This is a crutch until we either don't have to retry RPCs going to the
+        // same server or use retry policies.
+        client.handleRetryableError(rpc, new RecoverableException(status));
+      }
+    } else {
+      return new NonRecoverableException(status);
+    }
+    rpc.addTrace(tracer.callStatus(status).build());
+    return null;
+  }
+
+  /**
+   * Retry the given RPC.
+   *
+   * @param client client object to handle response and sending retries, if needed
+   * @param connection connection to send the request over
+   * @param rpc       an RPC to retry or fail
+   * @param exception an exception to propagate with the RPC
+   */
+  private static void failOrRetryRpc(AsyncKuduClient client,
+                                     Connection connection,
+                                     final KuduRpc<?> rpc,
+                                     final RecoverableException exception) {
+    rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(rpc.method(),
+        RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+        .serverInfo(connection.getServerInfo())
+        .callStatus(exception.getStatus())
+        .build());
+
+    RemoteTablet tablet = rpc.getTablet();
+    // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
+    // tablet it's because we didn't set it properly before calling sendRpc().
+    if (tablet == null) {  // Can't retry, dunno where this RPC should go.
+      rpc.errback(exception);
+    } else {
+      client.handleTabletNotFound(rpc, exception, connection.getServerInfo());
+    }
+  }
+
+  /**
+   * @return string representation of the object suitable for printing into logs, etc.
+   */
+  public String toString() {
+    return "RpcProxy@" + hashCode() + ", connection=" + connection;
+  }
+
+  /**
+   * @return underlying {@link Connection} object representing TCP connection to the server
+   */
+  @VisibleForTesting
+  Connection getConnection() {
+    return connection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
index 84e4292..6dc86e9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import java.net.InetAddress;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -35,7 +36,7 @@ public class ServerInfo {
   private final InetAddress resolvedAddr;
   private final boolean local;
   private static final ConcurrentHashMap<InetAddress, Boolean> isLocalAddressCache =
-          new ConcurrentHashMap<>();
+      new ConcurrentHashMap<>();
 
   /**
    * Constructor for all the fields. The intent is that there should only be one ServerInfo
@@ -45,6 +46,7 @@ public class ServerInfo {
    * @param resolvedAddr resolved address used to check if the server is local
    */
   public ServerInfo(String uuid, HostAndPort hostPort, InetAddress resolvedAddr) {
+    Preconditions.checkNotNull(uuid);
     this.uuid = uuid;
     this.hostPort = hostPort;
     this.resolvedAddr = resolvedAddr;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
deleted file mode 100644
index 9345408..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ /dev/null
@@ -1,779 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the above copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.apache.kudu.client;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.concurrent.GuardedBy;
-import javax.net.ssl.SSLException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.Message;
-import com.stumbleupon.async.Deferred;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.client.Negotiator.Result;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.rpc.RpcHeader;
-import org.apache.kudu.tserver.Tserver;
-import org.apache.kudu.util.Pair;
-
-/**
- * Stateful handler that manages a connection to a specific TabletServer.
- * <p>
- * This handler manages the RPC IDs, the serialization and de-serialization of
- * RPC requests and responses, and keeps track of the RPC in flights for which
- * a response is currently awaited, as well as temporarily buffered RPCs that
- * are awaiting to be sent to the network.
- * <p>
- * This class needs careful synchronization. It's a non-sharable handler,
- * meaning there is one instance of it per Netty {@link Channel} and each
- * instance is only used by one Netty IO thread at a time.  At the same time,
- * {@link AsyncKuduClient} calls methods of this class from random threads at
- * random times. The bottom line is that any data only used in the Netty IO
- * threads doesn't require synchronization, everything else does.
- * <p>
- * Acquiring the monitor on an object of this class will prevent it from
- * accepting write requests as well as buffering requests if the underlying
- * channel isn't connected.
- */
-@InterfaceAudience.Private
-public class TabletClient extends SimpleChannelUpstreamHandler {
-
-  public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
-
-  public static final byte RPC_CURRENT_VERSION = 9;
-  /** Initial header sent by the client upon connection establishment */
-  private static final byte[] CONNECTION_HEADER = new byte[] { 'h', 'r', 'p', 'c',
-      RPC_CURRENT_VERSION,     // RPC version.
-      0,
-      0
-  };
-
-  /**
-   * The channel we're connected to. This is set very soon after construction
-   * before any other events can arrive.
-   */
-  private Channel chan;
-
-  /** Lock for several of the below fields. */
-  private final ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * RPCs which are waiting to be sent once a connection has been
-   * established.
-   *
-   * This is non-null until the connection is connected, at which point
-   * any pending RPCs will be sent, and the variable set to null.
-   */
-  @GuardedBy("lock")
-  ArrayList<KuduRpc<?>> pendingRpcs = Lists.newArrayList();
-
-  /**
-   * A monotonically increasing counter for RPC IDs.
-   */
-  @GuardedBy("lock")
-  private int nextCallId = 0;
-
-  private static enum State {
-    NEGOTIATING,
-    ALIVE,
-    DISCONNECTED
-  }
-
-  @GuardedBy("lock")
-  private State state = State.NEGOTIATING;
-
-  @GuardedBy("lock")
-  private HashMap<Integer, KuduRpc<?>> rpcsInflight = new HashMap<>();
-
-  @GuardedBy("lock")
-  private Result negotiationResult;
-
-  private final AsyncKuduClient kuduClient;
-
-  private final long socketReadTimeoutMs;
-
-  private final RequestTracker requestTracker;
-
-  private final ServerInfo serverInfo;
-
-  /**
-   * Set to true when the client initiates a disconnect. The channelDisconnected
-   * event handler then knows not to log any warning about unexpected disconnection
-   * from the peer.
-   */
-  private volatile boolean closedByClient;
-
-  public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
-    this.kuduClient = client;
-    this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
-    this.requestTracker = client.getRequestTracker();
-    this.serverInfo = serverInfo;
-  }
-
-  <R> void sendRpc(KuduRpc<R> rpc) {
-    Preconditions.checkArgument(rpc.hasDeferred());
-    rpc.addTrace(
-        new RpcTraceFrame.RpcTraceFrameBuilder(
-            rpc.method(),
-            RpcTraceFrame.Action.SEND_TO_SERVER)
-            .serverInfo(serverInfo)
-            .build());
-
-    if (!rpc.deadlineTracker.hasDeadline()) {
-      LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
-    }
-
-    // Serialize the request outside the lock.
-    Message req;
-    try {
-      req = rpc.createRequestPB();
-    } catch (Exception e) {
-      LOG.error("Uncaught exception while constructing RPC request: " + rpc, e);
-      rpc.errback(e);  // Make the RPC fail with the exception.
-      return;
-    }
-
-    lock.lock();
-    boolean needsUnlock = true;
-    try {
-      // If we are disconnected, immediately fail the RPC
-      if (state == State.DISCONNECTED) {
-        lock.unlock();
-        needsUnlock = false;
-        Status statusNetworkError =
-            Status.NetworkError(getPeerUuidLoggingString() + "Connection reset");
-        failOrRetryRpc(rpc, new RecoverableException(statusNetworkError));
-        return;
-      }
-
-      // We're still negotiating -- just add it to the pending list
-      // which will be sent when the negotiation either completes or
-      // fails.
-      if (state == State.NEGOTIATING) {
-        pendingRpcs.add(rpc);
-        return;
-      }
-
-      // We are alive, in which case we must have a channel.
-      assert state == State.ALIVE;
-      assert chan != null;
-
-      // Check that the server supports feature flags, if our RPC uses them.
-      if (!rpc.getRequiredFeatures().isEmpty() &&
-          !negotiationResult.serverFeatures.contains(
-              RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
-        // We don't want to call out of this class while holding the lock.
-        lock.unlock();
-        needsUnlock = false;
-
-        Status statusNotSupported = Status.NotSupported("the server does not support the" +
-            "APPLICATION_FEATURE_FLAGS RPC feature");
-        rpc.errback(new NonRecoverableException(statusNotSupported));
-        return;
-      }
-
-      // Assign the call ID and write it to the wire.
-      sendCallToWire(rpc, req);
-    } finally {
-      if (needsUnlock) {
-        lock.unlock();
-      }
-    }
-  }
-
-  @GuardedBy("lock")
-  private <R> void sendCallToWire(final KuduRpc<R> rpc, Message reqPB) {
-    assert lock.isHeldByCurrentThread();
-    assert state == State.ALIVE;
-    assert chan != null;
-
-    int callId = nextCallId++;
-
-    final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
-        .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
-        .setCallId(callId)
-        .setRemoteMethod(
-            RpcHeader.RemoteMethodPB.newBuilder()
-            .setServiceName(rpc.serviceName())
-            .setMethodName(rpc.method()));
-
-    // If any timeout is set, find the lowest non-zero one, since this will be the deadline that
-    // the server must respect.
-    if (rpc.deadlineTracker.hasDeadline() || socketReadTimeoutMs > 0) {
-      long millisBeforeDeadline = Long.MAX_VALUE;
-      if (rpc.deadlineTracker.hasDeadline()) {
-        millisBeforeDeadline = rpc.deadlineTracker.getMillisBeforeDeadline();
-      }
-
-      long localRpcTimeoutMs = Long.MAX_VALUE;
-      if (socketReadTimeoutMs > 0) {
-        localRpcTimeoutMs = socketReadTimeoutMs;
-      }
-
-      headerBuilder.setTimeoutMillis((int) Math.min(millisBeforeDeadline, localRpcTimeoutMs));
-    }
-
-    if (rpc.isRequestTracked()) {
-      RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
-      if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
-        rpc.setSequenceId(requestTracker.newSeqNo());
-      }
-      requestIdBuilder.setClientId(requestTracker.getClientId());
-      requestIdBuilder.setSeqNo(rpc.getSequenceId());
-      requestIdBuilder.setAttemptNo(rpc.attempt);
-      requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
-      headerBuilder.setRequestId(requestIdBuilder);
-    }
-
-    final KuduRpc<?> oldrpc = rpcsInflight.put(callId, rpc);
-    if (oldrpc != null) {
-      final String wtf = getPeerUuidLoggingString() +
-          "Unexpected state: there was already an RPC in flight with" +
-          " callId=" + callId + ": " + oldrpc +
-          ".  This happened when sending out: " + rpc;
-      LOG.error(wtf);
-      Status statusIllegalState = Status.IllegalState(wtf);
-      // Make it fail. This isn't an expected failure mode.
-      oldrpc.errback(new NonRecoverableException(statusIllegalState));
-      return;
-    }
-
-    RpcOutboundMessage outbound = new RpcOutboundMessage(headerBuilder, reqPB);
-    Channels.write(chan, outbound);
-  }
-
-  /**
-   * Triggers the channel to be disconnected, which will asynchronously cause all
-   * pending and in-flight RPCs to be failed. This method is idempotent.
-   */
-  @VisibleForTesting
-  ChannelFuture disconnect() {
-    // 'chan' should never be null, because as soon as this object is created, it's
-    // added to a ChannelPipeline, which synchronously fires the channelOpen()
-    // event.
-    Preconditions.checkNotNull(chan);
-    closedByClient = true;
-    return Channels.disconnect(chan);
-  }
-
-  /**
-   * Forcefully shuts down the connection to this tablet server and fails all the outstanding RPCs.
-   * Only use when shutting down a client.
-   * @return deferred object to use to track the shutting down of this connection
-   */
-  public Deferred<Void> shutdown() {
-    ChannelFuture disconnectFuture = disconnect();
-    final Deferred<Void> d = new Deferred<Void>();
-    disconnectFuture.addListener(new ChannelFutureListener() {
-      public void operationComplete(final ChannelFuture future) {
-        if (future.isSuccess()) {
-          d.callback(null);
-          return;
-        }
-        final Throwable t = future.getCause();
-        if (t instanceof Exception) {
-          d.callback(t);
-        } else {
-          // Wrap the Throwable because Deferred doesn't handle Throwables,
-          // it only uses Exception.
-          Status statusIllegalState = Status.IllegalState("Failed to shutdown: " +
-              TabletClient.this);
-          d.callback(new NonRecoverableException(statusIllegalState, t));
-        }
-      }
-    });
-    return d;
-  }
-
-  /**
-   * The reason we are suppressing the unchecked conversions is because the KuduRpc is coming
-   * from a collection that has RPCs with different generics, and there's no way to get "decoded"
-   * casted correctly. The best we can do is to rely on the RPC to decode correctly,
-   * and to not pass an Exception in the callback.
-   */
-  @Override
-  @SuppressWarnings("unchecked")
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
-    Object m = evt.getMessage();
-    if (m instanceof Negotiator.Result) {
-      ArrayList<KuduRpc<?>> queuedRpcs;
-      lock.lock();
-      try {
-        assert chan != null;
-        this.negotiationResult = (Result) m;
-        state = State.ALIVE;
-        queuedRpcs = pendingRpcs;
-        pendingRpcs = null;
-      } finally {
-        lock.unlock();
-      }
-      // Send the queued RPCs after dropping the lock, so we don't end up calling
-      // their callbacks/errbacks with the lock held.
-      //
-      // queuedRpcs may be null in the case that we disconnected the client just
-      // at the same time as the Negotiator was finishing up.
-      if (queuedRpcs != null) {
-        sendQueuedRpcs(queuedRpcs);
-      }
-      return;
-    }
-    if (!(m instanceof CallResponse)) {
-      ctx.sendUpstream(evt);
-      return;
-    }
-    CallResponse response = (CallResponse)m;
-    final long start = System.nanoTime();
-
-    RpcHeader.ResponseHeader header = response.getHeader();
-    if (!header.hasCallId()) {
-      final int size = response.getTotalResponseSize();
-      final String msg = getPeerUuidLoggingString() + "RPC response (size: " + size + ") doesn't" +
-          " have a call ID: " + header;
-      LOG.error(msg);
-      Status statusIncomplete = Status.Incomplete(msg);
-      throw new NonRecoverableException(statusIncomplete);
-    }
-    final int rpcid = header.getCallId();
-
-    KuduRpc<Object> rpc;
-    lock.lock();
-    try {
-      rpc = (KuduRpc<Object>) rpcsInflight.remove(rpcid);
-    } finally {
-      lock.unlock();
-    }
-
-    if (rpc == null) {
-      final String msg = getPeerUuidLoggingString() + "Invalid rpcid: " + rpcid;
-      LOG.error(msg);
-      // If we get a bad RPC ID back, we are probably somehow misaligned from
-      // the server. So, we disconnect the connection.
-      throw new NonRecoverableException(Status.IllegalState(msg));
-    }
-
-    // Start building the trace, we'll finish it as we parse the response.
-    RpcTraceFrame.RpcTraceFrameBuilder traceBuilder =
-        new RpcTraceFrame.RpcTraceFrameBuilder(
-            rpc.method(),
-            RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
-            .serverInfo(serverInfo);
-
-    Pair<Object, Object> decoded = null;
-    KuduException exception = null;
-    Status retryableHeaderError = Status.OK();
-    if (header.hasIsError() && header.getIsError()) {
-      RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
-      KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
-      RpcHeader.ErrorStatusPB error = errorBuilder.build();
-      if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY)) {
-        // We can't return right away, we still need to remove ourselves from 'rpcsInflight', so we
-        // populate 'retryableHeaderError'.
-        retryableHeaderError = Status.ServiceUnavailable(error.getMessage());
-      } else {
-        String message = getPeerUuidLoggingString() +
-            "Tablet server sent error " + error.getMessage();
-        Status status = Status.RemoteError(message);
-        exception = new RpcRemoteException(status, error);
-        LOG.error(message); // can be useful
-      }
-    } else {
-      try {
-        decoded = rpc.deserialize(response, this.serverInfo.getUuid());
-      } catch (KuduException ex) {
-        exception = ex;
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(getPeerUuidLoggingString() + " received RPC response: " +
-          "rpcId=" + rpcid +
-          ", response size=" + response.getTotalResponseSize() +
-          ", rpc=" + rpc);
-    }
-
-    // This check is specifically for the ERROR_SERVER_TOO_BUSY case above.
-    if (!retryableHeaderError.ok()) {
-      rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build());
-      kuduClient.handleRetryableError(rpc, new RecoverableException(retryableHeaderError));
-      return;
-    }
-
-    // We can get this Message from within the RPC's expected type,
-    // so convert it into an exception and nullify decoded so that we use the errback route.
-    // Have to do it for both TS and Master errors.
-    if (decoded != null) {
-      if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
-        Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) decoded.getSecond();
-        exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder);
-        if (exception == null) {
-          // It was taken care of.
-          return;
-        } else {
-          // We're going to errback.
-          decoded = null;
-        }
-
-      } else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
-        Master.MasterErrorPB error = (Master.MasterErrorPB) decoded.getSecond();
-        exception = dispatchMasterErrorOrReturnException(rpc, error, traceBuilder);
-        if (exception == null) {
-          // Exception was taken care of.
-          return;
-        } else {
-          decoded = null;
-        }
-      }
-    }
-
-    try {
-      if (decoded != null) {
-        assert !(decoded.getFirst() instanceof Exception);
-        if (kuduClient.isStatisticsEnabled()) {
-          rpc.updateStatistics(kuduClient.getStatistics(), decoded.getFirst());
-        }
-        rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
-        rpc.callback(decoded.getFirst());
-      } else {
-        if (kuduClient.isStatisticsEnabled()) {
-          rpc.updateStatistics(kuduClient.getStatistics(), null);
-        }
-        rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
-        rpc.errback(exception);
-      }
-    } catch (Exception e) {
-      LOG.debug(getPeerUuidLoggingString() + "Unexpected exception while handling RPC #" + rpcid +
-          ", rpc=" + rpc, e);
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("------------------<< LEAVING  DECODE <<------------------" +
-          " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
-    }
-  }
-
-  /**
-   * Takes care of a few kinds of TS errors that we handle differently, like tablets or leaders
-   * moving. Builds and returns an exception if we don't know what to do with it.
-   * @param rpc the original RPC call that triggered the error
-   * @param error the error the TS sent
-   * @return an exception if we couldn't dispatch the error, or null
-   */
-  private KuduException dispatchTSErrorOrReturnException(
-      KuduRpc<?> rpc, Tserver.TabletServerErrorPB error,
-      RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
-    Tserver.TabletServerErrorPB.Code errCode = error.getCode();
-    WireProtocol.AppStatusPB.ErrorCode errStatusCode = error.getStatus().getCode();
-    Status status = Status.fromTabletServerErrorPB(error);
-    if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
-      kuduClient.handleTabletNotFound(rpc, new RecoverableException(status), this);
-      // we're not calling rpc.callback() so we rely on the client to retry that RPC
-    } else if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_RUNNING ||
-        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
-      kuduClient.handleRetryableError(rpc, new RecoverableException(status));
-      // The following two error codes are an indication that the tablet isn't a leader.
-    } else if (errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
-        errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
-      kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
-    } else {
-      return new NonRecoverableException(status);
-    }
-    rpc.addTrace(traceBuilder.callStatus(status).build());
-    return null;
-  }
-
-  /**
-   * Provides different handling for various kinds of master errors: re-uses the
-   * mechanisms already in place for handling tablet server errors as much as possible.
-   * @param rpc the original RPC call that triggered the error
-   * @param error the error the master sent
-   * @return an exception if we couldn't dispatch the error, or null
-   */
-  private KuduException dispatchMasterErrorOrReturnException(
-      KuduRpc<?> rpc, Master.MasterErrorPB error, RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
-    WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
-    Status status = Status.fromMasterErrorPB(error);
-    if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
-      kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
-    } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
-      if (rpc instanceof ConnectToMasterRequest) {
-        // Special case:
-        // We never want to retry this RPC, we only use it to poke masters to learn where the leader
-        // is. If the error is truly non recoverable, it'll be handled later.
-        return new RecoverableException(status);
-      } else {
-        // TODO: This is a crutch until we either don't have to retry RPCs going to the
-        // same server or use retry policies.
-        kuduClient.handleRetryableError(rpc, new RecoverableException(status));
-      }
-    } else {
-      return new NonRecoverableException(status);
-    }
-    rpc.addTrace(traceBuilder.callStatus(status).build());
-    return null;
-  }
-
-  /**
-   * Tells whether or not this handler should be used.
-   * <p>
-   * @return true if this instance can be used, else false if this handler is known to have been
-   * disconnected from the server and sending an RPC (via {@link #sendRpc(KuduRpc)}) will be
-   * retried in the client right away
-   */
-  public boolean isAlive() {
-    lock.lock();
-    try {
-      return state == State.ALIVE || state == State.NEGOTIATING;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
-      throws Exception {
-    this.chan = e.getChannel();
-    super.channelOpen(ctx, e);
-  }
-
-  @Override
-  public void channelConnected(final ChannelHandlerContext ctx,
-                               final ChannelStateEvent e) {
-    assert chan != null;
-    Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
-    Negotiator negotiator = new Negotiator(serverInfo.getHostname(),
-        kuduClient.getSecurityContext());
-    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
-    negotiator.sendHello(chan);
-  }
-
-  @Override
-  public void handleUpstream(final ChannelHandlerContext ctx,
-                             final ChannelEvent e) throws Exception {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(e.toString());
-    }
-    super.handleUpstream(ctx, e);
-  }
-
-  @Override
-  public void channelDisconnected(final ChannelHandlerContext ctx,
-                                  final ChannelStateEvent e) throws Exception {
-    super.channelDisconnected(ctx, e);  // Let the ReplayingDecoder cleanup.
-    cleanup("Connection disconnected");
-  }
-
-  @Override
-  public void channelClosed(final ChannelHandlerContext ctx,
-                            final ChannelStateEvent e) throws Exception {
-    super.channelClosed(ctx, e);
-    cleanup("Connection closed");
-  }
-
-  /**
-   * Cleans up any outstanding or lingering RPC (used when shutting down).
-   * <p>
-   * All RPCs in flight will fail with a {@link RecoverableException} and
-   * all edits buffered will be re-scheduled.
-   *
-   * @param errorMessage string to describe the cause of cleanup
-   */
-  private void cleanup(final String errorMessage) {
-    final ArrayList<KuduRpc<?>> rpcsToFail = Lists.newArrayList();
-
-    lock.lock();
-    try {
-      // Cleanup can be called multiple times, but we only want to run it once.
-      if (state == State.DISCONNECTED) {
-        assert pendingRpcs == null;
-        return;
-      }
-      state = State.DISCONNECTED;
-
-      // In case we were negotiating, we need to fail any that were waiting
-      // for negotiation to complete.
-      if (pendingRpcs != null) {
-        rpcsToFail.addAll(pendingRpcs);
-        pendingRpcs = null;
-      }
-
-      // Similarly, we need to fail any that were already sent and in-flight.
-      if (rpcsInflight != null) {
-        rpcsToFail.addAll(rpcsInflight.values());
-        rpcsInflight = null;
-      }
-    } finally {
-      lock.unlock();
-    }
-    Status statusNetworkError = Status.NetworkError(getPeerUuidLoggingString() +
-        (errorMessage == null ? "Connection reset" : errorMessage));
-    RecoverableException exception = new RecoverableException(statusNetworkError);
-
-    failOrRetryRpcs(rpcsToFail, exception);
-  }
-
-  /**
-   * Retry all the given RPCs.
-   * @param rpcs a possibly empty but non-{@code null} collection of RPCs to retry or fail
-   * @param exception an exception to propagate with the RPCs
-   */
-  private void failOrRetryRpcs(final Collection<KuduRpc<?>> rpcs,
-                               final RecoverableException exception) {
-    for (final KuduRpc<?> rpc : rpcs) {
-      failOrRetryRpc(rpc, exception);
-    }
-  }
-
-  /**
-   * Retry the given RPC.
-   * @param rpc an RPC to retry or fail
-   * @param exception an exception to propagate with the RPC
-   */
-  private void failOrRetryRpc(final KuduRpc<?> rpc,
-                              final RecoverableException exception) {
-    rpc.addTrace(
-        new RpcTraceFrame.RpcTraceFrameBuilder(
-            rpc.method(),
-            RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
-            .serverInfo(serverInfo)
-            .callStatus(exception.getStatus())
-            .build());
-
-    RemoteTablet tablet = rpc.getTablet();
-    // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
-    // tablet it's because we didn't set it properly before calling sendRpc().
-    if (tablet == null) {  // Can't retry, dunno where this RPC should go.
-      rpc.errback(exception);
-    } else {
-      kuduClient.handleTabletNotFound(rpc, exception, this);
-    }
-  }
-
-
-  @Override
-  public void exceptionCaught(final ChannelHandlerContext ctx,
-                              final ExceptionEvent event) {
-    final Throwable e = event.getCause();
-    final Channel c = event.getChannel();
-
-    if (e instanceof RejectedExecutionException) {
-      LOG.warn(getPeerUuidLoggingString() + "RPC rejected by the executor," +
-          " ignore this if we're shutting down", e);
-    } else if (e instanceof ReadTimeoutException) {
-      LOG.debug(getPeerUuidLoggingString() + "Encountered a read timeout, will close the channel");
-    } else if (e instanceof ClosedChannelException) {
-      if (!closedByClient) {
-        LOG.info(getPeerUuidLoggingString() + "Lost connection to peer");
-      }
-    } else if (e instanceof SSLException && closedByClient) {
-      // There's a race in Netty where, when we call Channel.close(), it tries
-      // to send a TLS 'shutdown' message and enters a shutdown state. If another
-      // thread races to send actual data on the channel, then Netty will get a
-      // bit confused that we are trying to send data and misinterpret it as a
-      // renegotiation attempt, and throw an SSLException. So, we just ignore any
-      // SSLException if we've already attempted to close.
-    } else {
-      LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + c, e);
-    }
-    if (c.isOpen()) {
-      Channels.close(c);        // Will trigger channelClosed(), which will cleanup()
-    } else {                    // else: presumably a connection timeout.
-      cleanup(e.getMessage());  // => need to cleanup() from here directly.
-    }
-  }
-
-
-  /**
-   * Sends the queued RPCs to the server, once we're connected to it.
-   * This gets called after {@link #channelConnected}, once we were able to
-   * handshake with the server
-   *
-   * Must *not* be called with 'lock' held.
-   */
-  private void sendQueuedRpcs(List<KuduRpc<?>> rpcs) {
-    assert !lock.isHeldByCurrentThread();
-    for (final KuduRpc<?> rpc : rpcs) {
-      LOG.debug(getPeerUuidLoggingString() + "Executing RPC queued: " + rpc);
-      sendRpc(rpc);
-    }
-  }
-
-  private String getPeerUuidLoggingString() {
-    return "[Peer " + serverInfo.getUuid() + "] ";
-  }
-
-  ServerInfo getServerInfo() {
-    return serverInfo;
-  }
-
-  public String toString() {
-    final StringBuilder buf = new StringBuilder();
-    buf.append("TabletClient@")
-        .append(hashCode())
-        .append("(chan=")
-        .append(chan)
-        .append(", uuid=")
-        .append(serverInfo.getUuid())
-        .append(", #pending_rpcs=");
-    int npendingRpcs;
-    int nInFlight;
-    lock.lock();
-    try {
-      npendingRpcs = pendingRpcs == null ? 0 : pendingRpcs.size();
-      nInFlight = rpcsInflight == null ? 0 : rpcsInflight.size();
-    } finally {
-      lock.unlock();
-    }
-    buf.append(npendingRpcs);
-    buf.append(", #rpcs_inflight=")
-        .append(nInFlight)
-        .append(')');
-    return buf.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 379fadf..1caa21c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -315,13 +315,15 @@ public class BaseKuduTest {
   static final Callback<Object, Object> defaultErrorCB = new Callback<Object, Object>() {
     @Override
     public Object call(Object arg) throws Exception {
-      if (arg == null) return null;
+      if (arg == null) {
+        return null;
+      }
       if (arg instanceof Exception) {
         LOG.warn("Got exception", (Exception) arg);
       } else {
         LOG.warn("Got an error response back {}", arg);
       }
-      return new Exception("Can't recover from error, see previous WARN");
+      return new Exception("cannot recover from error: " + arg);
     }
   };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 97d5928..a9501a7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -180,12 +180,11 @@ public class ITClient extends BaseKuduTest {
      */
     private boolean disconnectNode() {
       try {
-        if (localAsyncClient.getTabletClients().size() == 0) {
+        final List<Connection> connections = localAsyncClient.getConnectionListCopy();
+        if (connections.isEmpty()) {
           return true;
         }
-
-        int tsToDisconnect = random.nextInt(localAsyncClient.getTabletClients().size());
-        localAsyncClient.getTabletClients().get(tsToDisconnect).disconnect();
+        connections.get(random.nextInt(connections.size())).disconnect();
 
       } catch (Exception e) {
         if (KEEP_RUNNING_LATCH.getCount() == 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
index 1e243a6..bb9791b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
@@ -26,20 +26,11 @@ import org.junit.Test;
 public class ITFaultTolerantScanner extends ITScannerMultiTablet {
   /**
    * Verifies for fault tolerant scanner, it can proceed
-   * properly even if shuts down client connection.
-   */
-  @Test(timeout = 100000)
-  public void testFaultTolerantShutDown() throws KuduException {
-    clientFaultInjection(true, true);
-  }
-
-  /**
-   * Verifies for fault tolerant scanner, it can proceed
    * properly even if disconnects client connection.
    */
   @Test(timeout = 100000)
   public void testFaultTolerantDisconnect() throws KuduException {
-    clientFaultInjection(false, true);
+    clientFaultInjection(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
index 0908da4..0f196fb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
@@ -54,8 +54,8 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
    * properly even if shuts down client connection.
    */
   @Test(timeout = 100000)
-  public void testNonFaultTolerantShutDown() throws KuduException {
-    clientFaultInjection(true, false);
+  public void testNonFaultTolerantDisconnect() throws KuduException {
+    clientFaultInjection(false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index c9b930e..6622d5d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -155,15 +155,13 @@ public class ITScannerMultiTablet extends BaseKuduTest {
   }
 
   /**
-   * Injecting failures (disconnect or shutdown client connection) while scanning, to verify:
+   * Injecting failures (i.e. drop client connection) while scanning, to verify:
    * both non-fault tolerant scanner and fault tolerant scanner will continue scan as expected.
    *
-   * @param shutDown if true shutdown client connection, otherwise disconnect
-   * @param isFaultTolerant if true uses fault tolerant scanner, otherwise
-   *                        uses non fault-tolerant one
+   * @param isFaultTolerant if true use fault-tolerant scanner, otherwise use non-fault-tolerant one
    * @throws Exception
    */
-  void clientFaultInjection(boolean shutDown, boolean isFaultTolerant) throws KuduException {
+  void clientFaultInjection(boolean isFaultTolerant) throws KuduException {
     KuduScanner scanner = syncClient.newScannerBuilder(table)
         .setFaultTolerant(isFaultTolerant)
         .batchSizeBytes(1)
@@ -178,15 +176,10 @@ public class ITScannerMultiTablet extends BaseKuduTest {
         rowCount += rri.getNumRows();
       }
 
-      // Forcefully shutdowns/disconnects the current connection and
-      // fails all outstanding RPCs in the middle of scanning.
-      if (shutDown) {
-        client.shutdownConnection(scanner.currentTablet(),
-                scanner.getReplicaSelection());
-      } else {
-        client.disconnect(scanner.currentTablet(),
-                scanner.getReplicaSelection());
-      }
+      // Forcefully disconnects the current connection and fails all outstanding RPCs
+      // in the middle of scanning.
+      client.newRpcProxy(scanner.currentTablet().getReplicaSelectedServerInfo(
+          scanner.getReplicaSelection())).getConnection().disconnect();
 
       while (scanner.hasMoreRows()) {
         loopCount++;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 16f9b79..eab429b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -22,7 +22,6 @@ import java.io.InputStreamReader;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -56,7 +55,7 @@ public class MiniKuduCluster implements AutoCloseable {
   private static final int PORT_START = 64030;
 
   // List of threads that print
-  private final List<Thread> PROCESS_INPUT_PRINTERS = new ArrayList<>();
+  private final List<Thread> processInputPrinters = new ArrayList<>();
 
   // Map of ports to master servers.
   private final Map<Integer, Process> masterProcesses = new ConcurrentHashMap<>();
@@ -337,7 +336,7 @@ public class MiniKuduCluster implements AutoCloseable {
     Thread thread = new Thread(printer);
     thread.setDaemon(true);
     thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command.get(0))) + ":" + port);
-    PROCESS_INPUT_PRINTERS.add(thread);
+    processInputPrinters.add(thread);
     thread.start();
 
     Thread.sleep(300);
@@ -409,7 +408,7 @@ public class MiniKuduCluster implements AutoCloseable {
       return;
     }
     LOG.info("Killing server at port " + port);
-    destroyAndWaitForProcess(ts);
+    terminateAndWait(ts);
   }
 
   /**
@@ -418,7 +417,7 @@ public class MiniKuduCluster implements AutoCloseable {
    */
   public void killTabletServers() throws InterruptedException {
     for (Process tserver : tserverProcesses.values()) {
-      destroyAndWaitForProcess(tserver);
+      terminateAndWait(tserver);
     }
     tserverProcesses.clear();
   }
@@ -446,7 +445,7 @@ public class MiniKuduCluster implements AutoCloseable {
       return;
     }
     LOG.info("Killing master at port " + port);
-    destroyAndWaitForProcess(master);
+    terminateAndWait(master);
   }
 
   /** {@override} */
@@ -456,36 +455,25 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
-   * Stops all the processes and deletes the folders used to store data and the flagfile.
+   * Stops all the test processes; deletes the folders used to store data; deletes the flag file.
    */
   public void shutdown() {
-    for (Iterator<Process> masterIter = masterProcesses.values().iterator(); masterIter.hasNext(); ) {
-      try {
-        destroyAndWaitForProcess(masterIter.next());
-      } catch (InterruptedException e) {
-        // Need to continue cleaning up.
-      }
-      masterIter.remove();
-    }
-
-    for (Iterator<Process> tsIter = tserverProcesses.values().iterator(); tsIter.hasNext(); ) {
-      try {
-        destroyAndWaitForProcess(tsIter.next());
-      } catch (InterruptedException e) {
-        // Need to continue cleaning up.
-      }
-      tsIter.remove();
-    }
+    boolean wasInterrupted = false;
+    wasInterrupted |= terminateAndWait(masterProcesses);
+    wasInterrupted |= terminateAndWait(tserverProcesses);
 
     // Whether we were interrupted or not above we still destroyed all the processes, so the input
     // printers will hit EOFs and stop.
-    for (Thread thread : PROCESS_INPUT_PRINTERS) {
+    for (Thread thread : processInputPrinters) {
       try {
         thread.join();
       } catch (InterruptedException e) {
+        wasInterrupted = true;
         // Need to continue cleaning up.
+        LOG.info("ignoring request to interrupt; waiting for input printer {} to exit", thread);
       }
     }
+    processInputPrinters.clear();
 
     for (String path : pathsToDelete) {
       try {
@@ -507,14 +495,44 @@ public class MiniKuduCluster implements AutoCloseable {
         LOG.warn("Unable to close MiniKdc", e);
       }
     }
+
+    if (wasInterrupted) {
+      Thread.currentThread().interrupt();
+    }
   }
 
-  private void destroyAndWaitForProcess(Process process) throws InterruptedException {
+  private static void terminateAndWait(Process process) throws InterruptedException {
     process.destroy();
     process.waitFor();
   }
 
   /**
+   * Terminate and wait for exit of every process in the specified container.
+   *
+   * @param processes map of processes to terminate
+   * @return true if {@link InterruptedException} was received while waiting for processes'
+   *         termination, false otherwise
+   */
+  private static boolean terminateAndWait(Map<Integer, Process> processes) {
+    boolean wasInterrupted = false;
+    for (Process p : processes.values()) {
+      while (true) {
+        try {
+          terminateAndWait(p);
+          break;
+        } catch (InterruptedException e) {
+          wasInterrupted = true;
+          // Not being polite here: ignore the request to interrupt and continue cleaning up.
+          LOG.info("ignoring request to interrupt; waiting process {} to exit", p);
+        }
+      }
+    }
+    processes.clear();
+
+    return wasInterrupted;
+  }
+
+  /**
    * Returns the comma-separated list of master addresses.
    * @return master addresses
    */
@@ -568,8 +586,7 @@ public class MiniKuduCluster implements AutoCloseable {
           LOG.info(line);
         }
         in.close();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         if (!e.getMessage().contains("Stream closed")) {
           LOG.error("Caught error while reading a process' output", e);
         }

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 8153328..5e33203 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -93,16 +93,16 @@ public class TestAsyncKuduClient extends BaseKuduTest {
   }
 
   private void disconnectAndWait() throws InterruptedException {
-    for (TabletClient tabletClient : client.getTabletClients()) {
-      tabletClient.disconnect();
+    for (Connection c : client.getConnectionListCopy()) {
+      c.disconnect();
     }
     Stopwatch sw = Stopwatch.createStarted();
-    boolean allDead = false;
+    boolean disconnected = false;
     while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
       boolean sleep = false;
-      if (!client.getTabletClients().isEmpty()) {
-        for (TabletClient tserver : client.getTabletClients()) {
-          if (tserver.isAlive()) {
+      if (!client.getConnectionListCopy().isEmpty()) {
+        for (Connection c : client.getConnectionListCopy()) {
+          if (!c.isDisconnected()) {
             sleep = true;
             break;
           }
@@ -112,11 +112,11 @@ public class TestAsyncKuduClient extends BaseKuduTest {
       if (sleep) {
         Thread.sleep(50);
       } else {
-        allDead = true;
+        disconnected = true;
         break;
       }
     }
-    assertTrue(allDead);
+    assertTrue(disconnected);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 41c7c27..80fe206 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -143,7 +143,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     RemoteTablet rt =
         client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
     String tabletId = rt.getTabletId();
-    TabletClient tc = client.getTabletClient(rt.getLeaderUUID());
+    RpcProxy proxy = client.newRpcProxy(rt.getLeaderServerInfo());
     try {
       // Delete table so we get table not found error.
       client.deleteTable(TABLE_NAME).join();
@@ -151,7 +151,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       while (true) {
         ListTabletsRequest req = new ListTabletsRequest();
         Deferred<ListTabletsResponse> d = req.getDeferred();
-        tc.sendRpc(req);
+        proxy.sendRpc(req);
         ListTabletsResponse resp = d.join();
         if (!resp.getTabletsList().contains(tabletId)) {
           break;
@@ -191,7 +191,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
 
   /**
    * Regression test for a bug in which, when a tablet client is disconnected
-   * and we reconnect, we were previously leaking the old TabletClient
+   * and we reconnect, we were previously leaking the old RpcProxy
    * object in the client2tablets map.
    */
   @Test(timeout = 100000)
@@ -212,7 +212,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
 
-      int numClientsBefore = client.getTabletClients().size();
+      int numClientsBefore = client.getConnectionListCopy().size();
 
       // Restart all the tablet servers.
       killTabletServers();
@@ -223,7 +223,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
 
       // We should not have leaked an entry in the client2tablets map.
-      int numClientsAfter = client.getTabletClients().size();
+      int numClientsAfter = client.getConnectionListCopy().size();
       assertEquals(numClientsBefore, numClientsAfter);
     } finally {
       restartTabletServers();
@@ -395,7 +395,8 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     assertEquals(20, countRowsInScan(scanner));
 
     // Test removing the connection and then do a rapid set of inserts
-    client.getTabletClients().get(0).shutdown().join(DEFAULT_SLEEP);
+    client.getConnectionListCopy().get(0).disconnect()
+        .awaitUninterruptibly(DEFAULT_SLEEP);
     session.setMutationBufferSpace(1);
     for (int i = 91; i < 101; i++) {
       try {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 22010ca..2404005 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -18,11 +18,13 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
-import java.net.InetAddress;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
 import org.junit.Test;
@@ -33,69 +35,98 @@ public class TestConnectionCache {
 
   @Test(timeout = 50000)
   public void test() throws Exception {
-    try (MiniKuduCluster cluster =
-             new MiniKuduCluster.MiniKuduClusterBuilder().numMasters(3).build()) {
+    MiniKuduCluster cluster = null;
+    try {
+      cluster = new MiniKuduCluster.MiniKuduClusterBuilder().numMasters(3).build();
 
-      AsyncKuduClient client =
+      final AsyncKuduClient client =
           new AsyncKuduClient.AsyncKuduClientBuilder(cluster.getMasterAddresses()).build();
+      final List<HostAndPort> addresses = cluster.getMasterHostPorts();
 
-      List<HostAndPort> addresses = cluster.getMasterHostPorts();
-
-      // Below we ping the masters directly using TabletClient, so if they aren't ready to process
+      // Below we ping the masters directly using RpcProxy, so if they aren't ready to process
       // RPCs we'll get an error. Here by listing the tables we make sure this won't happen since
       // it won't return until a master leader is found.
       client.getTablesList().join();
 
-      ConnectionCache cache = new ConnectionCache(client);
+      final List<ServerInfo> serverInfos = Lists.newArrayList();
       int i = 0;
       for (HostAndPort hp : addresses) {
-        // Ping the process so we go through the whole connection process.
-        InetAddress addr = NetUtil.getInetAddress(hp.getHost());
-        TabletClient conn =
-            cache.newClient(new ServerInfo(i + "", hp, addr));
-        pingConnection(conn);
-        i++;
+        serverInfos.add(new ServerInfo("" + i, hp, NetUtil.getInetAddress(hp.getHost())));
+        ++i;
       }
-      assertEquals(3, cache.getImmutableTabletClientsList().size());
-      assertFalse(cache.allConnectionsAreDead());
-
-      TabletClient conn = cache.getClient("0");
-
-      // Kill the connection.
-      conn.shutdown().join();
-      waitForConnectionToDie(conn);
-      assertFalse(conn.isAlive());
 
-      // Make sure the cache also knows it's dead, but that not all the connections are.
-      assertFalse(cache.getClient("0").isAlive());
-      assertFalse(cache.allConnectionsAreDead());
+      // Ping the process so we go through the whole connection process.
+      for (ServerInfo si : serverInfos) {
+        final RpcProxy h = client.newRpcProxy(si);
+        assertNotNull(h.getConnection());
+        pingConnection(h);
+      }
 
-      // Test reconnecting with only the UUID.
-      TabletClient newConn = cache.getLiveClient("0");
-      assertFalse(conn == newConn);
-      pingConnection(newConn);
+      // 1 tserver and 3 masters and 3 connections from the newRpcProxy() in the loop above.
+      assertEquals(1 + 3 + 3, client.getConnectionListCopy().size());
+      assertFalse(allConnectionsDisconnected(client));
+
+      final RpcProxy proxy = client.newRpcProxy(serverInfos.get(0));
+
+      // Disconnect from the server.
+      proxy.getConnection().disconnect().awaitUninterruptibly();
+      waitForConnectionToClose(proxy.getConnection());
+      assertTrue(proxy.getConnection().isDisconnected());
+
+      // Make sure not all the connections in the connection cache are disconnected yet. Actually,
+      // only the connection to server '0' should be disconnected.
+      assertFalse(allConnectionsDisconnected(client));
+
+      // For a new RpcProxy instance, a new connection to the same destination is established.
+      final RpcProxy newHelper = client.newRpcProxy(serverInfos.get(0));
+      final Connection newConnection = newHelper.getConnection();
+      assertNotNull(newConnection);
+      assertNotSame(proxy.getConnection(), newConnection);
+
+      // The client-->server connection should not be established at this point yet. Wait a little
+      // before checking the state of the connection: this is to check for the status of the
+      // underlying connection _after_ the negotiation is run, if a regression happens. The
+      // negotiation on the underlying connection should be run upon submitting the very first
+      // RPC via the proxy object, not upon creating RpcProxy instance (see KUDU-1878).
+      Thread.sleep(500);
+      assertFalse(newConnection.isReady());
+      pingConnection(newHelper);
+      assertTrue(newConnection.isReady());
 
       // Test disconnecting and make sure we cleaned up all the connections.
-      cache.disconnectEverything().join();
-      waitForConnectionToDie(cache.getClient("0"));
-      waitForConnectionToDie(cache.getClient("1"));
-      waitForConnectionToDie(cache.getClient("2"));
-      assertTrue(cache.allConnectionsAreDead());
+      for (Connection c : client.getConnectionListCopy()) {
+        c.disconnect().awaitUninterruptibly();
+        waitForConnectionToClose(c);
+      }
+      assertTrue(allConnectionsDisconnected(client));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private boolean allConnectionsDisconnected(AsyncKuduClient client) {
+    for (Connection c : client.getConnectionListCopy()) {
+      if (!c.isDisconnected()) {
+        return false;
+      }
     }
+    return true;
   }
 
-  private void waitForConnectionToDie(TabletClient conn) throws InterruptedException {
+  private void waitForConnectionToClose(Connection c) throws InterruptedException {
     DeadlineTracker deadlineTracker = new DeadlineTracker();
     deadlineTracker.setDeadline(5000);
-    while (conn.isAlive() && !deadlineTracker.timedOut()) {
+    while (!c.isDisconnected() && !deadlineTracker.timedOut()) {
       Thread.sleep(250);
     }
   }
 
-  private void pingConnection(TabletClient conn) throws Exception {
+  private void pingConnection(RpcProxy proxy) throws Exception {
     PingRequest ping = PingRequest.makeMasterPingRequest();
     Deferred<PingResponse> d = ping.getDeferred();
-    conn.sendRpc(ping);
+    proxy.sendRpc(ping);
     d.join();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index bed7e4b..5516648 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.net.InetAddress;
@@ -40,28 +41,28 @@ public class TestRemoteTablet {
     RemoteTablet tablet = getTablet(2);
 
     // Demote the wrong leader, no-op.
-    assertEquals("2", tablet.getLeaderUUID());
+    assertEquals("2", tablet.getLeaderServerInfo().getUuid());
     tablet.demoteLeader("1");
-    assertEquals("2", tablet.getLeaderUUID());
+    assertEquals("2", tablet.getLeaderServerInfo().getUuid());
 
     // Tablet at server 1 was deleted.
     assertTrue(tablet.removeTabletClient("1"));
-    assertEquals("2", tablet.getLeaderUUID());
+    assertEquals("2", tablet.getLeaderServerInfo().getUuid());
 
     // Simulate another thread trying to remove 1.
     assertFalse(tablet.removeTabletClient("1"));
 
     // Tablet at server 0 was deleted.
     assertTrue(tablet.removeTabletClient("0"));
-    assertEquals("2", tablet.getLeaderUUID());
+    assertEquals("2", tablet.getLeaderServerInfo().getUuid());
 
     // Leader was demoted.
     tablet.demoteLeader("2");
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
 
     // Simulate another thread doing the same.
     tablet.demoteLeader("2");
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
   }
 
   @Test
@@ -70,11 +71,11 @@ public class TestRemoteTablet {
 
     // Test we can remove it.
     assertTrue(tablet.removeTabletClient("2"));
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
 
     // Test demoting it doesn't break anything.
     tablet.demoteLeader("2");
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
   }
 
   @Test
@@ -83,11 +84,11 @@ public class TestRemoteTablet {
 
     // Test we can remove it.
     assertTrue(tablet.removeTabletClient("0"));
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
 
     // Test demoting it doesn't break anything.
     tablet.demoteLeader("0");
-    assertEquals(null, tablet.getLeaderUUID());
+    assertNull(tablet.getLeaderServerInfo());
 
     // Test removing a server with no leader doesn't break.
     assertTrue(tablet.removeTabletClient("2"));
@@ -97,7 +98,7 @@ public class TestRemoteTablet {
   public void testLocalReplica() {
     RemoteTablet tablet = getTablet(0, 0);
 
-    assertEquals("0", tablet.getClosestUUID());
+    assertEquals("0", tablet.getClosestServerInfo().getUuid());
   }
 
   @Test
@@ -105,15 +106,17 @@ public class TestRemoteTablet {
     RemoteTablet tablet = getTablet(0, -1);
 
     // We just care about getting one back.
-    assertNotNull(tablet.getClosestUUID());
+    assertNotNull(tablet.getClosestServerInfo().getUuid());
   }
 
   @Test
   public void testReplicaSelection() {
     RemoteTablet tablet = getTablet(0, 1);
 
-    assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY));
-    assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA));
+    assertEquals("0",
+        tablet.getReplicaSelectedServerInfo(ReplicaSelection.LEADER_ONLY).getUuid());
+    assertEquals("1",
+        tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid());
   }
 
   private RemoteTablet getTablet(int leaderIndex) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 2a22ecd..5eca5d2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -29,6 +29,7 @@ public class TestTimeouts extends BaseKuduTest {
   /**
    * This test case tries different methods that should all timeout, while relying on the client to
    * pass down the timeouts to the session and scanner.
+   * TODO(aserbin) this test is flaky; add delays on the server side to make it stable
    */
   @Test(timeout = 100000)
   public void testLowTimeouts() throws Exception {


[2/2] kudu git commit: [java] separating Connection

Posted by al...@apache.org.
[java] separating Connection

This patch separates lower-level, connection-related functionality
from the TabletClient class into the new Connection class.
The updated TabletClient has been renamed into RpcProxy.
Also, this patch contains other micro-updates on the related code.

In addition, this patch addresses KUDU-1878.

This work is done in the context of KUDU-2013.

Change-Id: Id4ac81d9454631e7501c31576c24f85e968bb871
Reviewed-on: http://gerrit.cloudera.org:8080/7146
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58248841
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58248841
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58248841

Branch: refs/heads/master
Commit: 58248841f213a64683ee217f025f0a38a8450f74
Parents: 7e74527
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Jun 1 18:39:13 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 5 20:58:46 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 277 +++----
 .../apache/kudu/client/ConnectToCluster.java    | 112 ++-
 .../java/org/apache/kudu/client/Connection.java | 634 +++++++++++++++
 .../org/apache/kudu/client/ConnectionCache.java | 269 ++-----
 .../java/org/apache/kudu/client/KuduRpc.java    |   6 +-
 .../java/org/apache/kudu/client/Negotiator.java |   5 +-
 .../org/apache/kudu/client/RemoteTablet.java    |  53 +-
 .../java/org/apache/kudu/client/RpcProxy.java   | 406 ++++++++++
 .../java/org/apache/kudu/client/ServerInfo.java |   4 +-
 .../org/apache/kudu/client/TabletClient.java    | 779 -------------------
 .../org/apache/kudu/client/BaseKuduTest.java    |   6 +-
 .../java/org/apache/kudu/client/ITClient.java   |   7 +-
 .../kudu/client/ITFaultTolerantScanner.java     |  11 +-
 .../kudu/client/ITNonFaultTolerantScanner.java  |   4 +-
 .../kudu/client/ITScannerMultiTablet.java       |  21 +-
 .../org/apache/kudu/client/MiniKuduCluster.java |  73 +-
 .../apache/kudu/client/TestAsyncKuduClient.java |  16 +-
 .../kudu/client/TestAsyncKuduSession.java       |  13 +-
 .../apache/kudu/client/TestConnectionCache.java | 109 ++-
 .../apache/kudu/client/TestRemoteTablet.java    |  31 +-
 .../org/apache/kudu/client/TestTimeouts.java    |   1 +
 21 files changed, 1504 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 019a3f5..a304d05 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,8 +27,10 @@
 package org.apache.kudu.client;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
@@ -43,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.security.auth.Subject;
 
@@ -67,6 +71,7 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
@@ -126,7 +131,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with
    * stale information.
    */
-  static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+  private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+
   /**
    * The number of tablets to fetch from the master when looking up a range of
    * tablets.
@@ -142,16 +148,18 @@ public class AsyncKuduClient implements AutoCloseable {
   private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
       new ConcurrentHashMap<>();
 
+  /** A cache to keep track of already opened connections to Kudu servers. */
   private final ConnectionCache connectionCache;
 
   @GuardedBy("sessions")
   private final Set<AsyncKuduSession> sessions = new HashSet<>();
 
-  // Since the masters also go through TabletClient, we need to treat them as if they were a normal
-  // table. We'll use the following fake table name to identify places where we need special
+  // Since RPCs to the masters also go through RpcProxy, we need to treat them as if they were a
+  // normal table. We'll use the following fake table name to identify places where we need special
   // handling.
+  // TODO(aserbin) clean this up
   static final String MASTER_TABLE_NAME_PLACEHOLDER =  "Kudu Master";
-  final KuduTable masterTable;
+  private final KuduTable masterTable;
   private final List<HostAndPort> masterAddresses;
 
   private final HashedWheelTimer timer;
@@ -212,7 +220,40 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     String clientId = UUID.randomUUID().toString().replace("-", "");
     this.requestTracker = new RequestTracker(clientId);
-    this.connectionCache = new ConnectionCache(this);
+    this.connectionCache = new ConnectionCache(
+        securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+  }
+
+  /**
+   * Get a proxy to send RPC calls to the specified server.
+   *
+   * @param serverInfo server's information
+   * @return the proxy object bound to the target server
+   */
+  @Nonnull
+  RpcProxy newRpcProxy(final ServerInfo serverInfo) {
+    Preconditions.checkNotNull(serverInfo);
+    return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+  }
+
+  /**
+   * Get a proxy to send RPC calls to Kudu master at the specified end-point.
+   *
+   * @param hostPort master end-point
+   * @return the proxy object bound to the target master
+   */
+  @Nullable
+  RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+    // We should have a UUID to construct ServerInfo for the master, but we have a chicken
+    // and egg problem, we first need to communicate with the masters to find out about them,
+    // and that's what we're trying to do. The UUID is just used for logging and cache key,
+    // so instead we just use concatenation of master host and port, prefixed with "master-".
+    final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+    if (inetAddress == null) {
+      // TODO(todd): should we log the resolution failure? throw an exception?
+      return null;
+    }
+    return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
   }
 
   /**
@@ -374,7 +415,7 @@ public class AsyncKuduClient implements AutoCloseable {
     return sendRpcToTablet(rpc);
   }
 
-  Deferred<GetTableSchemaResponse> getTableSchema(String name) {
+  private Deferred<GetTableSchemaResponse> getTableSchema(String name) {
     GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
     rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
     return sendRpcToTablet(rpc);
@@ -501,9 +542,9 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * This callback will be repeatadly used when opening a table until it is done being created.
+   * This callback will be repeatedly used when opening a table until it is done being created.
    */
-  Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
+  private Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
       final KuduRpc<KuduTable> rpc, final KuduTable table) {
     return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>() {
       @Override
@@ -635,18 +676,6 @@ public class AsyncKuduClient implements AutoCloseable {
     return requestTracker;
   }
 
-  HashedWheelTimer getTimer() {
-    return timer;
-  }
-
-  ClientSocketChannelFactory getChannelFactory() {
-    return channelFactory;
-  }
-
-  SecurityContext getSecurityContext() {
-    return securityContext;
-  }
-
   /**
    * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
    * @param table the name of the table you intend to scan.
@@ -691,23 +720,20 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
     RemoteTablet tablet = scanner.currentTablet();
-    assert (tablet != null);
+    Preconditions.checkNotNull(tablet);
     KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
-    String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
-    TabletClient client = connectionCache.getClient(uuid);
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
     nextRequest.attempt++;
-    if (client == null || !client.isAlive()) {
-      // A null client means we either don't know about this tablet anymore (unlikely) or we
-      // couldn't find a leader (which could be triggered by a read timeout).
-      // We'll first delay the RPC in case things take some time to settle down, then retry.
-      Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid +
-          " will retry after a delay");
-      return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection());
+    if (info == null) {
+      return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(
+          String.format("No information on servers hosting tablet %s, will retry later",
+              tablet.getTabletId()))));
     }
+
     Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
-    client.sendRpc(nextRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
     return d;
   }
 
@@ -723,55 +749,19 @@ public class AsyncKuduClient implements AutoCloseable {
     if (tablet == null) {
       return Deferred.fromResult(null);
     }
-
-    final KuduRpc<AsyncKuduScanner.Response>  closeRequest = scanner.getCloseRequest();
-    final TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
-    if (client == null || !client.isAlive()) {
-      // Oops, we couldn't find a tablet server that hosts this tablet. Our
-      // cache was probably invalidated while the client was scanning. So
-      // we can't close this scanner properly.
-      LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
+    final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection());
+    if (info == null) {
       return Deferred.fromResult(null);
     }
 
     final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
     closeRequest.attempt++;
-    client.sendRpc(closeRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
     return d;
   }
 
   /**
-   * Forcefully shuts down the RemoteTablet connection and
-   * fails all outstanding RPCs.
-   *
-   * @param tablet the given tablet
-   * @param replicaSelection replica selection mechanism to use
-   */
-  @VisibleForTesting
-  void shutdownConnection(RemoteTablet tablet,
-                          ReplicaSelection replicaSelection) {
-    TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(replicaSelection));
-    client.shutdown();
-  }
-
-  /**
-   * Forcefully disconnects the RemoteTablet connection and
-   * fails all outstanding RPCs.
-   *
-   * @param tablet the given tablet
-   * @param replicaSelection replica selection mechanism to use
-   */
-  @VisibleForTesting
-  void disconnect(RemoteTablet tablet,
-                  ReplicaSelection replicaSelection) {
-    TabletClient client = connectionCache.getClient(
-        tablet.getReplicaSelectedUUID(replicaSelection));
-    client.disconnect();
-  }
-
-  /**
    * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
    * of the tablet identified by the RPC's table and partition key.
    *
@@ -812,15 +802,12 @@ public class AsyncKuduClient implements AutoCloseable {
     // If we found a tablet, we'll try to find the TS to talk to.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
-      if (uuid != null) {
+      ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection());
+      if (info != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);
-        TabletClient client = connectionCache.getLiveClient(uuid);
-        if (client != null) {
-          client.sendRpc(request);
-          return d;
-        }
+        RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+        return d;
       }
     }
 
@@ -923,7 +910,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param <R> Request's return type.
    * @return An errback.
    */
-  <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
+  private <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(
+      final KuduRpc<R> request) {
     return new Callback<Exception, Exception>() {
       @Override
       public Exception call(Exception e) throws Exception {
@@ -944,26 +932,26 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param errback the errback to call if something goes wrong when calling IsCreateTableDone
    * @return Deferred used to track the provided KuduRpc
    */
-  <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc,
-                                           final Callback<Deferred<R>,
-                                               Master.IsCreateTableDoneResponsePB> retryCB,
-                                           final Callback<Exception, Exception> errback) {
+  private <R> Deferred<R> delayedIsCreateTableDone(
+      final KuduTable table,
+      final KuduRpc<R> rpc,
+      final Callback<Deferred<R>,
+      Master.IsCreateTableDoneResponsePB> retryCB,
+      final Callback<Exception, Exception> errback) {
 
     final class RetryTimer implements TimerTask {
       public void run(final Timeout timeout) {
         String tableId = table.getTableId();
         final boolean has_permit = acquireMasterLookupPermit();
-        if (!has_permit) {
+        if (!has_permit && !tablesNotServed.contains(tableId)) {
           // If we failed to acquire a permit, it's worth checking if someone
           // looked up the tablet we're interested in.  Every once in a while
           // this will save us a Master lookup.
-          if (!tablesNotServed.contains(tableId)) {
-            try {
-              retryCB.call(null);
-              return;
-            } catch (Exception e) {
-              // we're calling RetryRpcCB which doesn't throw exceptions, ignore
-            }
+          try {
+            retryCB.call(null);
+            return;
+          } catch (Exception e) {
+            // we're calling RetryRpcCB which doesn't throw exceptions, ignore
           }
         }
         IsCreateTableDoneRequest isCreateTableDoneRequest =
@@ -1027,11 +1015,11 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  long getSleepTimeForRpc(KuduRpc<?> rpc) {
-    byte attemptCount = rpc.attempt;
+  private long getSleepTimeForRpc(KuduRpc<?> rpc) {
+    int attemptCount = rpc.attempt;
     assert (attemptCount > 0);
     if (attemptCount == 0) {
-      LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc,
+      LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: {}", rpc,
           new Exception("Exception created to collect stack trace"));
       attemptCount = 1;
     }
@@ -1039,29 +1027,12 @@ public class AsyncKuduClient implements AutoCloseable {
     long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) *
         sleepRandomizer.nextDouble());
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
+      LOG.trace("Going to sleep for {} at retry {}", sleepTime, rpc.attempt);
     }
     return sleepTime;
   }
 
   /**
-   * Modifying the list returned by this method won't change how AsyncKuduClient behaves,
-   * but calling certain methods on the returned TabletClients can. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return copy of the current TabletClients list
-   */
-  @VisibleForTesting
-  List<TabletClient> getTabletClients() {
-    return connectionCache.getImmutableTabletClientsList();
-  }
-
-  @VisibleForTesting
-  TabletClient getTabletClient(String uuid) {
-    return connectionCache.getClient(uuid);
-  }
-
-  /**
    * Clears {@link #tableLocations} of the table's entries.
    *
    * This method makes the maps momentarily inconsistent, and should only be
@@ -1080,7 +1051,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return {@code true} if this RPC already had too many attempts,
    * {@code false} otherwise (in which case it's OK to retry once more)
    */
-  static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
+  private static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
     return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
   }
 
@@ -1091,8 +1062,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param cause What was cause of the last failed attempt, if known.
    * You can pass {@code null} if the cause is unknown.
    */
-  static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
-                                                  final KuduException cause) {
+  private static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
+                                                          final KuduException cause) {
     String message;
     if (request.attempt > MAX_RPC_ATTEMPTS) {
       message = "Too many attempts: ";
@@ -1127,7 +1098,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // this will save us a Master lookup.
       TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
       if (entry != null && !entry.isNonCoveredRange() &&
-          entry.getTablet().getLeaderUUID() != null) {
+          entry.getTablet().getLeaderServerInfo() != null) {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
@@ -1164,9 +1135,8 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
     // TODO(todd): stop using this 'masterTable' hack.
-    return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache,
-        defaultAdminOperationTimeoutMs)
-        .addCallback(
+    return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
+        defaultAdminOperationTimeoutMs).addCallback(
             new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
               @Override
               public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
@@ -1306,8 +1276,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * We're handling a tablet server that's telling us it doesn't have the tablet we're asking for.
    * We're in the context of decode() meaning we need to either callback or retry later.
    */
-  <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    invalidateTabletCache(rpc.getTablet(), server);
+  <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+    invalidateTabletCache(rpc.getTablet(), info);
     handleRetryableError(rpc, ex);
   }
 
@@ -1315,8 +1285,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * A tablet server is letting us know that it isn't the specified tablet's leader in response
    * a RPC, so we need to demote it and retry.
    */
-  <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
+  <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+    rpc.getTablet().demoteLeader(info.getUuid());
     handleRetryableError(rpc, ex);
   }
 
@@ -1370,12 +1340,40 @@ public class AsyncKuduClient implements AutoCloseable {
    * Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing
    * the tablet itself from the caches.
    */
-  private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
-    String uuid = server.getServerInfo().getUuid();
+  private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info) {
+    final String uuid = info.getUuid();
     LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId());
     tablet.removeTabletClient(uuid);
   }
 
+  /**
+   * Translate master-provided information {@link Master.TSInfoPB} on a tablet server into internal
+   * {@link ServerInfo} representation.
+   *
+   * @param tsInfoPB master-provided information for the tablet server
+   * @return an object that contains all the server's information
+   * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+   */
+  private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
+    final List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
+    final String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
+    if (addresses.isEmpty()) {
+      LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
+      return null;
+    }
+
+    // from meta_cache.cc
+    // TODO: if the TS advertises multiple host/ports, pick the right one
+    // based on some kind of policy. For now just use the first always.
+    final HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
+    final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+    if (inetAddress == null) {
+      throw new UnknownHostException(
+          "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
+    }
+    return new ServerInfo(uuid, hostPort, inetAddress);
+  }
+
   /** Callback executed when a master lookup completes.  */
   private final class MasterLookupCB implements Callback<Object,
       Master.GetTableLocationsResponsePB> {
@@ -1418,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  boolean acquireMasterLookupPermit() {
+  private boolean acquireMasterLookupPermit() {
     try {
       // With such a low timeout, the JVM may chose to spin-wait instead of
       // de-scheduling the thread (and causing context switches and whatnot).
@@ -1433,7 +1431,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * Releases a master lookup permit that was acquired.
    * @see #acquireMasterLookupPermit
    */
-  void releaseMasterLookupPermit() {
+  private void releaseMasterLookupPermit() {
     masterLookups.release();
   }
 
@@ -1477,7 +1475,7 @@ public class AsyncKuduClient implements AutoCloseable {
       List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount());
       for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
         try {
-          ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo());
+          ServerInfo serverInfo = resolveTS(replica.getTsInfo());
           if (serverInfo != null) {
             servers.add(serverInfo);
           }
@@ -1508,7 +1506,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
     // sleep before retrying.
     TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
-    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
+    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) {
       throw new NoLeaderFoundException(
           Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
     }
@@ -1588,8 +1586,9 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Invokes {@link #shutdown()} and waits. This method returns
-   * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
+   * Invokes {@link #shutdown()} and waits. This method returns void, so consider invoking
+   * {@link #shutdown()} directly if there's a need to handle dangling RPCs.
+   *
    * @throws Exception if an error happens while closing the connections
    */
   @Override
@@ -1607,7 +1606,8 @@ public class AsyncKuduClient implements AutoCloseable {
    *   <li>Releases all other resources.</li>
    * </ul>
    * <strong>Not calling this method before losing the last reference to this
-   * instance may result in data loss and other unwanted side effects</strong>
+   * instance may result in data loss and other unwanted side effects.</strong>
+   *
    * @return A {@link Deferred}, whose callback chain will be invoked once all
    * of the above have been done. If this callback chain doesn't fail, then
    * the clean shutdown will be successful, and all the data will be safe on
@@ -1628,6 +1628,7 @@ public class AsyncKuduClient implements AutoCloseable {
         super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
       }
 
+      @Override
       public void run() {
         // This terminates the Executor.
         channelFactory.releaseExternalResources();
@@ -1676,7 +1677,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // concurrent modification during the iteration.
     Set<AsyncKuduSession> copyOfSessions;
     synchronized (sessions) {
-      copyOfSessions = new HashSet<AsyncKuduSession>(sessions);
+      copyOfSessions = new HashSet<>(sessions);
     }
     if (sessions.isEmpty()) {
       return Deferred.fromResult(null);
@@ -1690,7 +1691,7 @@ public class AsyncKuduClient implements AutoCloseable {
     return Deferred.group(deferreds);
   }
 
-  private boolean isMasterTable(String tableId) {
+  private static boolean isMasterTable(String tableId) {
     // Checking that it's the same instance so there's absolutely no chance of confusing the master
     // 'table' for a user one.
     return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
@@ -1709,6 +1710,14 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * @return copy of the current TabletClients list
+   */
+  @VisibleForTesting
+  List<Connection> getConnectionListCopy() {
+    return connectionCache.getConnectionListCopy();
+  }
+
+  /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 22b31f0..dfba55d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Callback;
@@ -80,7 +81,8 @@ final class ConnectToCluster {
 
   private static Deferred<ConnectToMasterResponsePB> connectToMaster(
       final KuduTable masterTable,
-      final TabletClient masterClient, KuduRpc<?> parentRpc,
+      final RpcProxy masterProxy,
+      KuduRpc<?> parentRpc,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
@@ -93,7 +95,7 @@ final class ConnectToCluster {
     }
     Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
     rpc.attempt++;
-    masterClient.sendRpc(rpc);
+    masterProxy.sendRpc(rpc);
 
     // If we are connecting to an older version of Kudu, we'll get an invalid request
     // error. In that case, we resend using the older version of the RPC.
@@ -107,10 +109,10 @@ final class ConnectToCluster {
               rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) {
             AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " +
                 "to server running Kudu < 1.3.");
-            Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred();
-            assert newAttempt != null;
+            final Deferred<ConnectToMasterResponsePB> newAttempt =
+                Preconditions.checkNotNull(rpc.getDeferred());
             rpc.setUseOldMethod();
-            masterClient.sendRpc(rpc);
+            masterProxy.sendRpc(rpc);
             return newAttempt;
           }
         }
@@ -128,8 +130,6 @@ final class ConnectToCluster {
    * @param masterTable the "placeholder" table used by AsyncKuduClient
    * @param masterAddresses the addresses of masters to fetch from
    * @param parentRpc RPC that prompted a master lookup, can be null
-   * @param connCache the client's connection cache, used for creating connections
-   *                  to masters
    * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
    * @return a Deferred object for the cluster connection status
    */
@@ -137,7 +137,6 @@ final class ConnectToCluster {
       KuduTable masterTable,
       List<HostAndPort> masterAddresses,
       KuduRpc<?> parentRpc,
-      ConnectionCache connCache,
       long defaultTimeoutMs) {
     ConnectToCluster connector = new ConnectToCluster(masterAddresses);
 
@@ -146,14 +145,14 @@ final class ConnectToCluster {
     // deferred.
     for (HostAndPort hostAndPort : masterAddresses) {
       Deferred<ConnectToMasterResponsePB> d;
-      TabletClient client = connCache.newMasterClient(hostAndPort);
-      if (client == null) {
+      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+      if (proxy != null) {
+        d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+      } else {
         String message = "Couldn't resolve this master's address " + hostAndPort.toString();
         LOG.warn(message);
         Status statusIOE = Status.IOError(message);
         d = Deferred.fromError(new NonRecoverableException(statusIOE));
-      } else {
-        d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs);
       }
       d.addCallbacks(connector.callbackForNode(hostAndPort),
           connector.errbackForNode(hostAndPort));
@@ -192,56 +191,50 @@ final class ConnectToCluster {
    * to responseD.
    */
   private void incrementCountAndCheckExhausted() {
-    if (countResponsesReceived.incrementAndGet() == numMasters) {
-      if (responseDCalled.compareAndSet(false, true)) {
-
-        // We want `allUnrecoverable` to only be true if all the masters came back with
-        // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
-        // that replies with RecoverableException or with an ok response but is a FOLLOWER is
-        // enough to keep us retrying.
-        boolean allUnrecoverable = true;
-        if (exceptionsReceived.size() == countResponsesReceived.get()) {
-          for (Exception ex : exceptionsReceived) {
-            if (!(ex instanceof NonRecoverableException)) {
-              allUnrecoverable = false;
-              break;
-            }
+    if (countResponsesReceived.incrementAndGet() == numMasters &&
+        responseDCalled.compareAndSet(false, true)) {
+      // We want `allUnrecoverable` to only be true if all the masters came back with
+      // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+      // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+      // enough to keep us retrying.
+      boolean allUnrecoverable = true;
+      if (exceptionsReceived.size() == countResponsesReceived.get()) {
+        for (Exception ex : exceptionsReceived) {
+          if (!(ex instanceof NonRecoverableException)) {
+            allUnrecoverable = false;
+            break;
           }
-        } else {
-          allUnrecoverable = false;
         }
+      } else {
+        allUnrecoverable = false;
+      }
 
-        String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
-        if (allUnrecoverable) {
-          // This will stop retries.
-          String msg = String.format("Couldn't find a valid master in (%s). " +
-              "Exceptions received: %s", allHosts,
-              Joiner.on(",").join(Lists.transform(
-                  exceptionsReceived, Functions.toStringFunction())));
-          Status s = Status.ServiceUnavailable(msg);
-          responseD.callback(new NonRecoverableException(s));
+      String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+      if (allUnrecoverable) {
+        // This will stop retries.
+        String msg = String.format("Couldn't find a valid master in (%s). " +
+            "Exceptions received: %s", allHosts,
+            Joiner.on(",").join(Lists.transform(
+                exceptionsReceived, Functions.toStringFunction())));
+        Status s = Status.ServiceUnavailable(msg);
+        responseD.callback(new NonRecoverableException(s));
+      } else {
+        String message = String.format("Master config (%s) has no leader.",
+            allHosts);
+        Exception ex;
+        if (exceptionsReceived.isEmpty()) {
+          LOG.warn("None of the provided masters {} is a leader; will retry", allHosts);
+          ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
         } else {
-          String message = String.format("Master config (%s) has no leader.",
-              allHosts);
-          Exception ex;
-          if (exceptionsReceived.isEmpty()) {
-            LOG.warn(String.format(
-                "None of the provided masters (%s) is a leader, will retry.",
-                allHosts));
-            ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
-          } else {
-            LOG.warn(String.format(
-                "Unable to find the leader master (%s), will retry",
-                allHosts));
-            String joinedMsg = message + " Exceptions received: " +
-                Joiner.on(",").join(Lists.transform(
-                    exceptionsReceived, Functions.toStringFunction()));
-            Status s = Status.ServiceUnavailable(joinedMsg);
-            ex = new NoLeaderFoundException(s,
-                exceptionsReceived.get(exceptionsReceived.size() - 1));
-          }
-          responseD.callback(ex);
+          LOG.warn("Unable to find the leader master {}; will retry", allHosts);
+          String joinedMsg = message + " Exceptions received: " +
+              Joiner.on(",").join(Lists.transform(
+                  exceptionsReceived, Functions.toStringFunction()));
+          Status s = Status.ServiceUnavailable(joinedMsg);
+          ex = new NoLeaderFoundException(s,
+              exceptionsReceived.get(exceptionsReceived.size() - 1));
         }
+        responseD.callback(ex);
       }
     }
   }
@@ -273,8 +266,7 @@ final class ConnectToCluster {
         // Someone else already found a leader. This is somewhat unexpected
         // because this means two nodes think they're the leader, but it's
         // not impossible. We'll just ignore it.
-        LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
-            hostAndPort.toString());
+        LOG.debug("Callback already invoked, discarding response({}) from {}", r, hostAndPort);
         return null;
       }
 
@@ -304,7 +296,7 @@ final class ConnectToCluster {
 
     @Override
     public Void call(Exception e) throws Exception {
-      LOG.warn("Error receiving a response from: " + hostAndPort, e);
+      LOG.warn("Error receiving response from {}", hostAndPort, e);
       exceptionsReceived.add(e);
       incrementCountAndCheckExhausted();
       return null;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
new file mode 100644
index 0000000..ea1e113
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.net.ssl.SSLException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+
+/**
+ * Class representing a connection from the client to a Kudu server (master or tablet server):
+ * a high-level wrapper for the TCP connection between the client and the server.
+ * <p>
+ * It's a stateful handler that manages a connection to a Kudu server.
+ * <p>
+ * This handler manages the RPC IDs, and keeps track of the RPCs in flight for which
+ * a response is currently awaited, as well as temporarily buffered RPCs that are waiting
+ * to be sent to the server.
+ * <p>
+ * Acquiring the monitor on an object of this class will prevent it from
+ * accepting write requests as well as buffering requests if the underlying
+ * channel isn't connected.
+ * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class Connection extends SimpleChannelUpstreamHandler {
+  /** Information on the target server. */
+  private final ServerInfo serverInfo;
+
+  /** Security context to use for connection negotiation. */
+  private final SecurityContext securityContext;
+
+  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+  private final long socketReadTimeoutMs;
+
+  /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+  private final HashedWheelTimer timer;
+
+  /** The underlying Netty's socket channel. */
+  private final SocketChannel channel;
+
+  /**
+   * Set to true when disconnect initiated explicitly from the client side. The channelDisconnected
+   * event handler then knows not to log any warning about unexpected disconnection from the peer.
+   */
+  private volatile boolean explicitlyDisconnected = false;
+
+  /** Logger: a sink for the log messages originated from this class. */
+  private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
+
+  private static final byte RPC_CURRENT_VERSION = 9;
+
+  /** Initial header sent by the client upon connection establishment. */
+  private static final byte[] CONNECTION_HEADER = new byte[]{'h', 'r', 'p', 'c',
+      RPC_CURRENT_VERSION,     // RPC version.
+      0,
+      0
+  };
+
+  /** Lock to guard access to some of the fields below. */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /** A state of this object. */
+  @GuardedBy("lock")
+  private State state;
+
+  /**
+   * A hash table to store { callId, statusReportCallback } pairs, representing messages which have
+   * already been sent and pending responses from the server side. Once the server responds to a
+   * message, the corresponding entry is removed from the container and the response callback
+   * is invoked with the results represented by {@link CallResponseInfo}.
+   */
+  @GuardedBy("lock")
+  private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap<>();
+
+  /** Messages enqueued while the connection was not ready to start sending them over the wire. */
+  @GuardedBy("lock")
+  private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
+
+  /** The result of the connection negotiation. */
+  @GuardedBy("lock")
+  private Result negotiationResult = null;
+
+  /** A monotonically increasing counter for RPC IDs. */
+  @GuardedBy("lock")
+  private int nextCallId = 0;
+
+  Connection(ServerInfo serverInfo,
+             SecurityContext securityContext,
+             long socketReadTimeoutMs,
+             HashedWheelTimer timer,
+             ClientSocketChannelFactory channelFactory) {
+    this.serverInfo = serverInfo;
+    this.securityContext = securityContext;
+    this.state = State.NEW;
+    this.socketReadTimeoutMs = socketReadTimeoutMs;
+    this.timer = timer;
+
+    final ConnectionPipeline pipeline = new ConnectionPipeline();
+    pipeline.init();
+
+    channel = channelFactory.newChannel(pipeline);
+    SocketChannelConfig config = channel.getConfig();
+    config.setConnectTimeoutMillis(60000);
+    config.setTcpNoDelay(true);
+    // Unfortunately there is no way to override the keep-alive timeout in
+    // Java since the JRE doesn't expose any way to call setsockopt() with
+    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+    config.setKeepAlive(true);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelConnected(final ChannelHandlerContext ctx,
+                               final ChannelStateEvent e) {
+    lock.lock();
+    try {
+      Preconditions.checkState(state == State.CONNECTING);
+      state = State.NEGOTIATING;
+    } finally {
+      lock.unlock();
+    }
+    Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+    Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
+    negotiator.sendHello(channel);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void handleUpstream(final ChannelHandlerContext ctx,
+                             final ChannelEvent e) throws Exception {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} upstream event {}", getLogPrefix(), e);
+    }
+    super.handleUpstream(ctx, e);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelDisconnected(final ChannelHandlerContext ctx,
+                                  final ChannelStateEvent e) throws Exception {
+    // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+    // pipeline after Connection itself. So, just handle the disconnection event ourselves.
+    cleanup("connection disconnected");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void channelClosed(final ChannelHandlerContext ctx,
+                            final ChannelStateEvent e) throws Exception {
+    // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+    // pipeline after Connection itself. So, just handle the close event ourselves.
+    cleanup("connection closed");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
+    Object m = evt.getMessage();
+    if (m instanceof Negotiator.Result) {
+      lock.lock();
+      try {
+        Preconditions.checkState(state == State.NEGOTIATING);
+        Preconditions.checkNotNull(queuedMessages);
+
+        state = State.READY;
+        negotiationResult = (Negotiator.Result) m;
+        List<QueuedMessage> queued = queuedMessages;
+        // The queuedMessages should not be used anymore once the connection is negotiated.
+        queuedMessages = null;
+        // Send out all the enqueued messages. This is done while holding the lock to preserve
+        // the sequence of the already enqueued and being-enqueued-right-now messages and simplify
+        // the logic which checks for the consistency using Preconditions.checkXxx() methods.
+        for (final QueuedMessage qm : queued) {
+          sendCallToWire(qm.message, qm.cb);
+        }
+      } finally {
+        lock.unlock();
+      }
+      return;
+    }
+
+    // Some other event which the connection does not handle.
+    if (!(m instanceof CallResponse)) {
+      ctx.sendUpstream(evt);
+      return;
+    }
+
+    final CallResponse response = (CallResponse) m;
+    final RpcHeader.ResponseHeader header = response.getHeader();
+    if (!header.hasCallId()) {
+      final int size = response.getTotalResponseSize();
+      final String msg = getLogPrefix() +
+          " RPC response (size: " + size + ") doesn't" + " have callID: " + header;
+      LOG.error(msg);
+      throw new NonRecoverableException(Status.Incomplete(msg));
+    }
+
+    final int callId = header.getCallId();
+    Callback<Void, CallResponseInfo> responseCbk;
+    lock.lock();
+    try {
+      Preconditions.checkState(state == State.READY);
+      responseCbk = inflightMessages.remove(callId);
+    } finally {
+      lock.unlock();
+    }
+
+    if (responseCbk == null) {
+      final String msg = getLogPrefix() + " invalid callID: " + callId;
+      LOG.error(msg);
+      // If we get a bad RPC ID back, we are probably somehow misaligned from
+      // the server. So, we disconnect the connection.
+      throw new NonRecoverableException(Status.IllegalState(msg));
+    }
+
+    if (!header.hasIsError() || !header.getIsError()) {
+      // The success case.
+      responseCbk.call(new CallResponseInfo(response, null));
+      return;
+    }
+
+    final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+    KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
+    final RpcHeader.ErrorStatusPB error = errorBuilder.build();
+    if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
+        error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
+      responseCbk.call(new CallResponseInfo(
+          response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
+      return;
+    }
+
+    final String message = getLogPrefix() + " server sent error " + error.getMessage();
+    LOG.error(message); // can be useful
+    responseCbk.call(new CallResponseInfo(
+        response, new RpcRemoteException(Status.RemoteError(message), error)));
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void exceptionCaught(final ChannelHandlerContext ctx,
+                              final ExceptionEvent event) {
+    final Throwable e = event.getCause();
+    final Channel c = event.getChannel();
+
+    if (e instanceof RejectedExecutionException) {
+      LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+    } else if (e instanceof ReadTimeoutException) {
+      LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+    } else if (e instanceof ClosedChannelException) {
+      if (!explicitlyDisconnected) {
+        LOG.info("{} lost connection to peer", getLogPrefix());
+      }
+    } else if (e instanceof SSLException && explicitlyDisconnected) {
+      // There's a race in Netty where, when we call Channel.close(), it tries
+      // to send a TLS 'shutdown' message and enters a shutdown state. If another
+      // thread races to send actual data on the channel, then Netty will get a
+      // bit confused that we are trying to send data and misinterpret it as a
+      // renegotiation attempt, and throw an SSLException. So, we just ignore any
+      // SSLException if we've already attempted to close.
+      LOG.debug("{} ignoring SSLException: already disconnected", getLogPrefix());
+    } else {
+      LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+    }
+    if (c.isOpen()) {
+      Channels.close(c);        // Will trigger channelClosed(), which will cleanup()
+    } else {                    // else: presumably a connection timeout.
+      cleanup(e.getMessage());  // => need to cleanup() from here directly.
+    }
+  }
+
+  /** Getter for the peer's end-point information */
+  public ServerInfo getServerInfo() {
+    return serverInfo;
+  }
+
+  /**
+   * @return true iff the connection is in the DISCONNECTED state
+   */
+  boolean isDisconnected() {
+    lock.lock();
+    try {
+      return state == State.DISCONNECTED;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * TODO(aserbin) make it possible to avoid calling this when the server features are not known yet
+   *
+   * @return the set of server's features, if known; null otherwise
+   */
+  @Nullable
+  Set<RpcFeatureFlag> getPeerFeatures() {
+    Set<RpcFeatureFlag> features = null;
+    lock.lock();
+    try {
+      if (negotiationResult != null) {
+        features = negotiationResult.serverFeatures;
+      }
+    } finally {
+      lock.unlock();
+    }
+    return features;
+  }
+
+  /**
+   * @return string representation of the peer (i.e. the server) information suitable for logging
+   */
+  String getLogPrefix() {
+    return "[peer " + serverInfo.getUuid() + "]";
+  }
+
+  /**
+   * Enqueue outbound message for sending to the remote server via Kudu RPC. The enqueueMessage()
+   * accepts messages even if the connection hasn't yet been established: the enqueued messages
+   * are sent out as soon as the connection to the server is ready. The connection is initiated upon
+   * enqueuing the very first outbound message.
+   */
+  void enqueueMessage(RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb)
+      throws RecoverableException {
+    lock.lock();
+    try {
+      if (state == State.DISCONNECTED) {
+        // The upper-level caller should handle the exception and retry using a new connection.
+        throw new RecoverableException(Status.IllegalState(
+            "connection in DISCONNECTED state; cannot enqueue a message"));
+      }
+
+      if (state == State.NEW) {
+        // Schedule connecting to the server.
+        connect();
+      }
+
+      // Set the call identifier for the outgoing RPC.
+      final int callId = nextCallId++;
+      RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder();
+      headerBuilder.setCallId(callId);
+
+      // Amend the timeout for the call, if necessary.
+      if (socketReadTimeoutMs > 0) {
+        final int timeoutMs = headerBuilder.getTimeoutMillis();
+        if (timeoutMs > 0) {
+          headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
+        }
+      }
+
+      // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
+      // The elements of the queuedMessages list will be processed when the negotiation either
+      // succeeds or fails.
+      if (state != State.READY) {
+        queuedMessages.add(new QueuedMessage(msg, cb));
+        return;
+      }
+
+      // It's time to initiate sending the message over the wire.
+      sendCallToWire(msg, cb);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Triggers the channel to be disconnected, which will asynchronously cause all
+   * queued and in-flight RPCs to be failed. This method is idempotent.
+   *
+   * @return future object to wait on the disconnect completion, if necessary
+   */
+  ChannelFuture disconnect() {
+    explicitlyDisconnected = true;
+    return Channels.disconnect(channel);
+  }
+
+  /**
+   * If open, forcefully shut down the connection to the server. This is the same as
+   * {@link #disconnect}, but it returns Deferred instead of ChannelFuture.
+   *
+   * @return deferred object for tracking the shutting down of this connection
+   */
+  Deferred<Void> shutdown() {
+    final ChannelFuture disconnectFuture = disconnect();
+    final Deferred<Void> d = new Deferred<>();
+    disconnectFuture.addListener(new ChannelFutureListener() {
+      public void operationComplete(final ChannelFuture future) {
+        if (future.isSuccess()) {
+          d.callback(null);
+          return;
+        }
+        final Throwable t = future.getCause();
+        if (t instanceof Exception) {
+          d.callback(t);
+        } else {
+          d.callback(new NonRecoverableException(
+              Status.IllegalState("failed to shutdown: " + this), t));
+        }
+      }
+    });
+    return d;
+  }
+
+  /**
+   * @return string representation of this object (suitable for printing into the logs, etc.)
+   */
+  public String toString() {
+    final StringBuilder buf = new StringBuilder();
+    buf.append("Connection@")
+        .append(hashCode())
+        .append("(channel=")
+        .append(channel)
+        .append(", uuid=")
+        .append(serverInfo.getUuid());
+    int queuedMessagesNum = 0;
+    int inflightMessagesNum = 0;
+    lock.lock();
+    try {
+      queuedMessagesNum = queuedMessages == null ? 0 : queuedMessages.size();
+      inflightMessagesNum = inflightMessages == null ? 0 : inflightMessages.size();
+    } finally {
+      lock.unlock();
+    }
+    buf.append(", #queued=").append(queuedMessagesNum)
+        .append(", #inflight=").append(inflightMessagesNum)
+        .append(")");
+    return buf.toString();
+  }
+
+  /**
+   * This is test-only method.
+   *
+   * @return true iff the connection is in the READY state
+   */
+  @VisibleForTesting
+  boolean isReady() {
+    lock.lock();
+    try {
+      return state == State.READY;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /** Start sending the message to the server over the wire. */
+  @GuardedBy("lock")
+  private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    Preconditions.checkState(state == State.READY);
+    Preconditions.checkNotNull(inflightMessages);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} sending {}", getLogPrefix(), msg);
+    }
+    final int callId = msg.getHeaderBuilder().getCallId();
+    final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
+    Preconditions.checkArgument(empty == null);
+    Channels.write(channel, msg);
+  }
+
+  /**
+   * Process the fact that the connection has been disconnected: update the state of this object and
+   * clean up any outstanding or lingering messages, notifying on the error via their status
+   * callbacks. The callee is supposed to handle the error and retry sending the messages,
+   * if needed.
+   *
+   * @param errorMessage a string to describe the cause of the cleanup
+   */
+  private void cleanup(final String errorMessage) {
+    List<QueuedMessage> queued;
+    Map<Integer, Callback<Void, CallResponseInfo>> inflight;
+
+    lock.lock();
+    try {
+      if (state == State.DISCONNECTED) {
+        Preconditions.checkState(queuedMessages == null);
+        Preconditions.checkState(inflightMessages == null);
+        return;
+      }
+
+      queued = queuedMessages;
+      queuedMessages = null;
+
+      inflight = inflightMessages;
+      inflightMessages = null;
+
+      state = State.DISCONNECTED;
+    } finally {
+      lock.unlock();
+    }
+    final Status error = Status.NetworkError(getLogPrefix() + " " +
+        (errorMessage == null ? "connection reset" : errorMessage));
+    final RecoverableException exception = new RecoverableException(error);
+
+    for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
+      try {
+        cb.call(new CallResponseInfo(null, exception));
+      } catch (Exception e) {
+        LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
+      }
+    }
+
+    if (queued != null) {
+      for (QueuedMessage qm : queued) {
+        try {
+          qm.cb.call(new CallResponseInfo(null, exception));
+        } catch (Exception e) {
+          LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
+        }
+      }
+    }
+  }
+
+  /** Initiate opening TCP connection to the server. */
+  private void connect() {
+    Preconditions.checkState(lock.isHeldByCurrentThread());
+    Preconditions.checkState(state == State.NEW);
+    state = State.CONNECTING;
+    channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
+  }
+
+  /** State of the Connection object. */
+  private enum State {
+    NEW,          // The object has just been created.
+    CONNECTING,   // The establishment of TCP connection to the server has started.
+    NEGOTIATING,  // The connection negotiation has started.
+    READY,        // The connection to the server has been opened, negotiated, and ready to use.
+    DISCONNECTED, // The TCP connection has been dropped off.
+  }
+
+  /**
+   * The class to represent RPC response received from the remote server.
+   * If the {@code exception} is null, then it's a success case and the {@code response} contains
+   * the information on the response. Otherwise it's an error and the {@code exception} provides
+   * information on the error. For the recoverable error case, the {@code exception} is of
+   * {@link RecoverableException} type, otherwise it's of {@link NonRecoverableException} type.
+   */
+  static final class CallResponseInfo {
+    public final CallResponse response;
+    public final KuduException exception;
+
+    CallResponseInfo(CallResponse response, KuduException exception) {
+      this.response = response;
+      this.exception = exception;
+    }
+  }
+
+  /** Internal class representing an enqueued outgoing message. */
+  private static final class QueuedMessage {
+    private final RpcOutboundMessage message;
+    private final Callback<Void, CallResponseInfo> cb;
+
+    QueuedMessage(RpcOutboundMessage message, Callback<Void, CallResponseInfo> cb) {
+      this.message = message;
+      this.cb = cb;
+    }
+  }
+
+  /** The helper class to build the Netty's connection pipeline. */
+  private final class ConnectionPipeline extends DefaultChannelPipeline {
+    void init() {
+      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+          KuduRpc.MAX_RPC_SIZE,
+          0, // length comes at offset 0
+          4, // length prefix is 4 bytes long
+          0, // no "length adjustment"
+          4 /* strip the length prefix */));
+      super.addLast("decode-inbound", new CallResponse.Decoder());
+      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+      if (Connection.this.socketReadTimeoutMs > 0) {
+        super.addLast("timeout-handler", new ReadTimeoutHandler(
+            Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
+      }
+      super.addLast("kudu-handler", Connection.this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 83ae8f0..844e79f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -17,263 +17,126 @@
 
 package org.apache.kudu.client;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.Common;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
 
 /**
- * The ConnectionCache is responsible for managing connections to masters and tablet servers.
- * There should only be one instance per Kudu client, and can <strong>not</strong> be shared between
- * clients.
- * <p>
- * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by
- * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues.
- * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes
- * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's
- * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead
- * connection prevents tight looping when hitting "Connection refused"-type of errors.
+ * The ConnectionCache is responsible for managing connections to Kudu masters and tablet servers.
+ * There should only be one instance of ConnectionCache per Kudu client, and it should not be
+ * shared between clients.
  * <p>
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
+ * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
+ * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
+ * to encounter memory issues.
+ *
  * This class is thread-safe.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class ConnectionCache {
+  /** Security context to use for connection negotiation. */
+  private final SecurityContext securityContext;
 
-  private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
+  /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
+  private final long socketReadTimeoutMs;
 
-  /**
-   * Cache that maps UUIDs to the clients connected to them.
-   * <p>
-   * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put,
-   * {@code putIfAbsent} isn't a good fit for us since it requires creating
-   * an object that may be "wasted" in case another thread wins the insertion
-   * race, and we don't want to create unnecessary connections.
-   */
-  @GuardedBy("lock")
-  private final HashMap<String, TabletClient> uuid2client = new HashMap<>();
+  /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
+  private final HashedWheelTimer timer;
 
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  /** Netty's channel factory to use by connections. */
+  private final ClientSocketChannelFactory channelFactory;
 
-  private final Lock readLock = lock.readLock();
-  private final Lock writeLock = lock.readLock();
+  /** Synchronization primitive to guard access to the fields below. */
+  private final ReentrantLock lock = new ReentrantLock();
 
-  private final AsyncKuduClient kuduClient;
+  @GuardedBy("lock")
+  private final HashMap<String, Connection> uuid2connection = new HashMap<>();
 
   /**
-   * Create a new empty ConnectionCache that will used the passed client to create connections.
-   * @param client a client that contains the information we need to create connections
+   * Create a new empty ConnectionCache given the specified parameters.
    */
-  ConnectionCache(AsyncKuduClient client) {
-    this.kuduClient = client;
+  ConnectionCache(SecurityContext securityContext,
+                  long socketReadTimeoutMs,
+                  HashedWheelTimer timer,
+                  ClientSocketChannelFactory channelFactory) {
+    this.securityContext = securityContext;
+    this.socketReadTimeoutMs = socketReadTimeoutMs;
+    this.timer = timer;
+    this.channelFactory = channelFactory;
   }
 
   /**
-   * Create a connection to a tablet server based on information provided by the master.
-   * @param tsInfoPB master-provided information for the tablet server
-   * @return an object that contains all the server's information
-   * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+   * Get connection to the specified server. If no connection exists or the existing connection
+   * is already disconnected, then create a new connection to the specified server. The newly
+   * created connection is not negotiated until enqueuing the first RPC to the target server.
+   *
+   * @param serverInfo the server end-point to connect to
+   * @return instance of this object with the specified destination
    */
-  ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
-    List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
-    String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
-    if (addresses.isEmpty()) {
-      LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
-      return null;
-    }
-
-    // from meta_cache.cc
-    // TODO: if the TS advertises multiple host/ports, pick the right one
-    // based on some kind of policy. For now just use the first always.
-    HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
-    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-    if (inetAddress == null) {
-      throw new UnknownHostException(
-          "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
-    }
-    return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo();
-  }
-
-  TabletClient newMasterClient(HostAndPort hostPort) {
-    // We should pass a UUID here but we have a chicken and egg problem, we first need to
-    // communicate with the masters to find out about them, and that's what we're trying to do.
-    // The UUID is just used for logging and cache key, so instead we just use a constructed
-    // string with the master host and port as.
-    return newClient("master-" + hostPort.toString(), hostPort);
-  }
+  public Connection getConnection(final ServerInfo serverInfo) {
+    Connection connection;
 
-  TabletClient newClient(String uuid, HostAndPort hostPort) {
-    InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-    if (inetAddress == null) {
-      // TODO(todd): should we log the resolution failure? throw an exception?
-      return null;
-    }
-
-    ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress);
-    return newClient(serverInfo);
-  }
-
-  TabletClient newClient(ServerInfo serverInfo) {
-    TabletClient client;
-    SocketChannel chan;
-
-    writeLock.lock();
+    lock.lock();
     try {
-      client = uuid2client.get(serverInfo.getUuid());
-      if (client != null && client.isAlive()) {
-        return client;
+      // First try to find an existing connection.
+      connection = uuid2connection.get(serverInfo.getUuid());
+      if (connection == null || connection.isDisconnected()) {
+        // If no valid connection is found, create a new one.
+        connection = new Connection(serverInfo, securityContext,
+            socketReadTimeoutMs, timer, channelFactory);
+        uuid2connection.put(serverInfo.getUuid(), connection);
       }
-      final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(serverInfo);
-      chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
-      uuid2client.put(serverInfo.getUuid(), client);
-    } finally {
-      writeLock.unlock();
-    }
-
-    final SocketChannelConfig config = chan.getConfig();
-    config.setConnectTimeoutMillis(5000);
-    config.setTcpNoDelay(true);
-    // Unfortunately there is no way to override the keep-alive timeout in
-    // Java since the JRE doesn't expose any way to call setsockopt() with
-    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
-    config.setKeepAlive(true);
-    chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(),
-                                       serverInfo.getPort())); // Won't block.
-    return client;
-  }
-
-  /**
-   * Get a connection to a server for the given UUID. The returned connection can be down and its
-   * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's
-   * gonna be re-connected automatically, use {@link #getLiveClient(String)}.
-   * @param uuid server's identifier
-   * @return a connection to a server, or null if the passed UUID isn't known
-   */
-  TabletClient getClient(String uuid) {
-    readLock.lock();
-    try {
-      return uuid2client.get(uuid);
     } finally {
-      readLock.unlock();
+      lock.unlock();
     }
-  }
 
-  /**
-   * Get a connection to a server for the given UUID. This method will automatically call
-   * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
-   * @param uuid server's identifier
-   * @return a connection to a server, or null if the passed UUID isn't known
-   */
-  TabletClient getLiveClient(String uuid) {
-    TabletClient client = getClient(uuid);
-
-    if (client == null) {
-      return null;
-    } else if (client.isAlive()) {
-      return client;
-    } else {
-      return newClient(client.getServerInfo());
-    }
+    return connection;
   }
 
   /**
-   * Asynchronously closes every socket, which will also cancel all the RPCs in flight.
+   * Asynchronously terminate every connection. This also cancels all the pending and in-flight
+   * RPCs.
    */
   Deferred<ArrayList<Void>> disconnectEverything() {
-    readLock.lock();
+    lock.lock();
     try {
-      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2client.size());
-      for (TabletClient ts : uuid2client.values()) {
-        deferreds.add(ts.shutdown());
+      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+      for (Connection c : uuid2connection.values()) {
+        deferreds.add(c.shutdown());
       }
       return Deferred.group(deferreds);
     } finally {
-      readLock.unlock();
+      lock.unlock();
     }
   }
 
   /**
-   * The list returned by this method can't be modified,
-   * but calling certain methods on the returned TabletClients can have an effect. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return copy of the current TabletClients list
-   */
-  List<TabletClient> getImmutableTabletClientsList() {
-    readLock.lock();
-    try {
-      return ImmutableList.copyOf(uuid2client.values());
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /**
-   * Queries all the cached connections if they are alive.
-   * @return true if all the connections are down, else false
+   * Return a copy of the all-connections-list. This method is exposed only to allow
+   * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+   * of the cache.
+   *
+   * @return a copy of the list of all connections in the connection cache
    */
   @VisibleForTesting
-  boolean allConnectionsAreDead() {
-    readLock.lock();
+  List<Connection> getConnectionListCopy() {
+    lock.lock();
     try {
-      for (TabletClient tserver : uuid2client.values()) {
-        if (tserver.isAlive()) {
-          return false;
-        }
-      }
+      return ImmutableList.copyOf(uuid2connection.values());
     } finally {
-      readLock.unlock();
-    }
-    return true;
-  }
-
-  private final class TabletClientPipeline extends DefaultChannelPipeline {
-    TabletClient init(ServerInfo serverInfo) {
-      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
-          KuduRpc.MAX_RPC_SIZE,
-          0, // length comes at offset 0
-          4, // length prefix is 4 bytes long
-          0, // no "length adjustment"
-          4 /* strip the length prefix */));
-      super.addLast("decode-inbound", new CallResponse.Decoder());
-      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
-      AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
-      final TabletClient client = new TabletClient(kuduClient, serverInfo);
-      if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
-        super.addLast("timeout-handler",
-            new ReadTimeoutHandler(kuduClient.getTimer(),
-                kuduClient.getDefaultSocketReadTimeoutMs(),
-                TimeUnit.MILLISECONDS));
-      }
-      super.addLast("kudu-handler", client);
-
-      return client;
+      lock.unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 30f1a9c..0de894f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -131,13 +131,13 @@ public abstract class KuduRpc<R> {
    * that access this attribute will have a happens-before relationship with
    * the rest of the code, due to other existing synchronization.
    */
-  byte attempt;  // package-private for TabletClient and AsyncKuduClient only.
+  int attempt;  // package-private for RpcProxy and AsyncKuduClient only.
 
   /**
-   * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+   * Set by RpcProxy when isRequestTracked returns true to identify this RPC in the sequence of
    * RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
    */
-  long sequenceId = RequestTracker.NO_SEQ_NO;
+  private long sequenceId = RequestTracker.NO_SEQ_NO;
 
   KuduRpc(KuduTable table) {
     this.table = table;

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 46d99db..a917af2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -147,9 +147,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    */
   private DecoderEmbedder<ChannelBuffer> sslEmbedder;
 
-  /** True if we have negotiated TLS with the server */
-  private boolean negotiatedTls;
-
   /**
    * The nonce sent from the server to the client, or null if negotiation has
    * not yet taken place, or the server does not send a nonce.
@@ -290,7 +287,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     // Store the supported features advertised by the server.
     serverFeatures = getFeatureFlags(response);
     // If the server supports TLS, we will always speak TLS to it.
-    negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+    final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
 
     // Check the negotiated authentication type sent by the server.
     chosenAuthnType = chooseAuthenticationType(response);

http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index d4702f8..a6025f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.base.Objects;
@@ -38,7 +39,7 @@ import org.apache.kudu.master.Master;
  * This class encapsulates the information regarding a tablet and its locations.
  * <p>
  * RemoteTablet's main function is to keep track of where the leader for this
- * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
+ * tablet is. For example, an RPC might call {@link #getLeaderServerInfo()}, contact that TS, find
  * it's not the leader anymore, and then call {@link #demoteLeader(String)}.
  * <p>
  * A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
@@ -138,53 +139,61 @@ class RemoteTablet implements Comparable<RemoteTablet> {
   }
 
   /**
-   * Gets the UUID of the tablet server that we think holds the leader replica for this tablet.
-   * @return a UUID of a tablet server that we think has the leader, else null
+   * Get the information on the tablet server that we think holds the leader replica for this
+   * tablet.
+   *
+   * @return information on a tablet server that we think has the leader, else null
    */
-  String getLeaderUUID() {
+  @Nullable
+  ServerInfo getLeaderServerInfo() {
     synchronized (tabletServers) {
-      return leaderUuid;
+      return tabletServers.get(leaderUuid);
     }
   }
 
   /**
-   * Gets the UUID of the closest server. If none is closer than the others, returns a random
-   * server UUID.
-   * @return the UUID of the closest server, which might be any if none is closer, or null if this
-   *         cache doesn't know of any servers
+   * Get the information on the closest server. If none is closer than the others,
+   * return the information on a randomly picked server.
+   *
+   * @return the information on the closest server, which might be any if none is closer, or null
+   *   if this cache doesn't know any servers.
    */
-  String getClosestUUID() {
+  @Nullable
+  ServerInfo getClosestServerInfo() {
     synchronized (tabletServers) {
-      String lastUuid = null;
-      for (ServerInfo serverInfo : tabletServers.values()) {
-        lastUuid = serverInfo.getUuid();
-        if (serverInfo.isLocal()) {
-          return serverInfo.getUuid();
+      ServerInfo last = null;
+      for (ServerInfo e : tabletServers.values()) {
+        last = e;
+        if (e.isLocal()) {
+          return e;
         }
       }
-      return lastUuid;
+      return last;
     }
   }
 
   /**
    * Helper function to centralize the calling of methods based on the passed replica selection
    * mechanism.
+   *
    * @param replicaSelection replica selection mechanism to use
-   * @return a UUID for the server that matches the selection, can be null
+   * @return information on the server that matches the selection, can be null
    */
-  String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+  @Nullable
+  ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) {
     switch (replicaSelection) {
       case LEADER_ONLY:
-        return getLeaderUUID();
+        return getLeaderServerInfo();
       case CLOSEST_REPLICA:
-        return getClosestUUID();
+        return getClosestServerInfo();
       default:
-        throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+        throw new RuntimeException("unknown replica selection mechanism " + replicaSelection);
     }
   }
 
   /**
-   * Gets the replicas of this tablet. The returned list may not be mutated.
+   * Get replicas of this tablet. The returned list may not be mutated.
+   *
    * @return the replicas of the tablet
    */
   List<LocatedTablet.Replica> getReplicas() {