You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/10/20 21:26:26 UTC

[1/2] kudu git commit: sasl: provide a mutex implementation to cyrus-sasl

Repository: kudu
Updated Branches:
  refs/heads/master ba36a0b7d -> 65cb2edf5


sasl: provide a mutex implementation to cyrus-sasl

In looking around the SASL code I found some notes that it is not
inherently thread-safe, unless we provide it a mutex implementation
using sasl_set_mutex(). This does just that.

Change-Id: I357de7f5df86d1663440b53cef715d92bee2cbe2
Reviewed-on: http://gerrit.cloudera.org:8080/4762
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cd40d1c5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cd40d1c5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cd40d1c5

Branch: refs/heads/master
Commit: cd40d1c5126d0fd541d95370cdb2998b60f2f82c
Parents: ba36a0b
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Oct 19 13:49:13 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Oct 20 21:01:19 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/sasl_common.cc | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cd40d1c5/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc
index b2c7849..a15536f 100644
--- a/src/kudu/rpc/sasl_common.cc
+++ b/src/kudu/rpc/sasl_common.cc
@@ -28,6 +28,7 @@
 #include "kudu/gutil/once.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/mutex.h"
 #include "kudu/util/net/sockaddr.h"
 
 using std::set;
@@ -120,6 +121,25 @@ static sasl_callback_t callbacks[] = {
   { SASL_CB_LIST_END, nullptr, nullptr }
 };
 
+
+// SASL requires mutexes for thread safety, but doesn't implement
+// them itself. So, we have to hook them up to our mutex implementation.
+static void* SaslMutexAlloc() {
+  return static_cast<void*>(new Mutex());
+}
+static void SaslMutexFree(void* m) {
+  delete static_cast<Mutex*>(m);
+}
+static int SaslMutexLock(void* m) {
+  static_cast<Mutex*>(m)->lock();
+  return 0; // indicates success.
+}
+static int SaslMutexUnlock(void* m) {
+  static_cast<Mutex*>(m)->unlock();
+  return 0; // indicates success.
+}
+
+
 // Determine whether initialization was ever called
 struct InitializationData {
   Status status;
@@ -135,6 +155,9 @@ static void DoSaslInit(void* app_name_char_array) {
   const char* const app_name = reinterpret_cast<const char* const>(app_name_char_array);
   VLOG(3) << "Initializing SASL library";
 
+  // Make SASL thread-safe.
+  sasl_set_mutex(&SaslMutexAlloc, &SaslMutexLock, &SaslMutexUnlock, &SaslMutexFree);
+
   sasl_init_data = new InitializationData();
   sasl_init_data->app_name = app_name;
 


[2/2] kudu git commit: [java client] Decouple TabletClient from RemoteTablet

Posted by ad...@apache.org.
[java client] Decouple TabletClient from RemoteTablet

RemoteTablet was caching TabletClients which was making it hard to test it
in isolation, since it required having real connections to servers.

This patch makes it so that only UUIDs are passed around, the client now
queries a RemoteTablet to know which UUID to get from ConnectionCache. A
lot of new unit tests were written that weren't possible before without
a lot of mocking.

Doing this brings subtle behavior changes. ConnectionCache is now the only
component responsible for handling TabletClients. The whole concept of "reconnection"
could be brought into ConnectionCache, simplifying sendRpcToTablet. It also
means that using an object monitor likely creates a bottleneck in ConnectionCache,
so a RWL was deployed instead.

Since I was in that code, I also changed the tablet ID from Slice to String in
RemoteTablet. It was an old mistake I made years ago, we didn't need the Slice.

This still needs to be benchmarked against the 1.0.x client jar.

Change-Id: If3ad2190c7e2c7f51cb9ffe6ed3348b62488e675
Reviewed-on: http://gerrit.cloudera.org:8080/4757
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/65cb2edf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/65cb2edf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/65cb2edf

Branch: refs/heads/master
Commit: 65cb2edf5661599f689e28f4eec161fe062f7cb5
Parents: cd40d1c
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Fri Oct 14 15:42:14 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Oct 20 21:19:32 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 100 +++---
 .../apache/kudu/client/AsyncKuduScanner.java    |   6 +-
 .../main/java/org/apache/kudu/client/Batch.java |   2 +-
 .../org/apache/kudu/client/ConnectionCache.java | 357 ++++++-------------
 .../java/org/apache/kudu/client/KuduRpc.java    |   2 +-
 .../java/org/apache/kudu/client/Operation.java  |   2 +-
 .../org/apache/kudu/client/PingRequest.java     |  75 ++++
 .../org/apache/kudu/client/PingResponse.java    |  35 ++
 .../org/apache/kudu/client/RemoteTablet.java    | 216 +++--------
 .../java/org/apache/kudu/client/Statistics.java |  10 +-
 .../apache/kudu/client/TableLocationsCache.java |   2 +-
 .../org/apache/kudu/client/TabletClient.java    |  21 +-
 .../apache/kudu/client/TestAsyncKuduClient.java |  85 +++--
 .../kudu/client/TestAsyncKuduSession.java       |   4 +-
 .../apache/kudu/client/TestConnectionCache.java |  89 +++++
 .../apache/kudu/client/TestRemoteTablet.java    |  99 +++++
 .../java/org/apache/kudu/client/TestUtils.java  |  38 ++
 17 files changed, 623 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 1e4cdbf..f455c0e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -583,21 +583,21 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return A deferred row.
    */
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
-    final RemoteTablet tablet = scanner.currentTablet();
+    RemoteTablet tablet = scanner.currentTablet();
     assert (tablet != null);
-    final TabletClient client = tablet.getLeaderConnection();
-    final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
-    final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
+    TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
+    KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
+    Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
-    next_request.attempt++;
+    nextRequest.attempt++;
     if (client == null || !client.isAlive()) {
       // A null client means we either don't know about this tablet anymore (unlikely) or we
       // couldn't find a leader (which could be triggered by a read timeout).
       // We'll first delay the RPC in case things take some time to settle down, then retry.
-      return delayedSendRpcToTablet(next_request, null);
+      return delayedSendRpcToTablet(nextRequest, null);
     }
-    client.sendRpc(next_request);
+    client.sendRpc(nextRequest);
     return d;
   }
 
@@ -614,7 +614,7 @@ public class AsyncKuduClient implements AutoCloseable {
       return Deferred.fromResult(null);
     }
 
-    final TabletClient client = tablet.getLeaderConnection();
+    final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
     if (client == null || !client.isAlive()) {
       // Oops, we couldn't find a tablet server that hosts this tablet. Our
       // cache was probably invalidated while the client was scanning. So
@@ -666,43 +666,16 @@ public class AsyncKuduClient implements AutoCloseable {
       request.setPropagatedTimestamp(lastPropagatedTs);
     }
 
-    // If we found a tablet, we'll try to find the TS to talk to. If that TS was previously
-    // disconnected, say because we didn't query that tablet for some seconds, then we'll try to
-    // reconnect based on the old information. If that fails, we'll instead continue with the next
-    // block that queries the master.
+    // If we found a tablet, we'll try to find the TS to talk to.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      TabletClient tabletClient = tablet.getLeaderConnection();
-      if (tabletClient != null) {
-        final Deferred<R> d = request.getDeferred();
-        if (tabletClient.isAlive()) {
-          request.setTablet(tablet);
-          tabletClient.sendRpc(request);
-          return d;
-        }
-        try {
-          tablet.reconnectTabletClient(tabletClient);
-        } catch (UnknownHostException e) {
-          LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
-              tabletClient.getUuid(), e);
-          // Because of this exception, getLeaderConnection() below won't be able to find a newTabletClient
-          // and we'll delay the RPC.
-        }
-        TabletClient newTabletClient = tablet.getLeaderConnection();
-        assert (tabletClient != newTabletClient);
-
-        if (newTabletClient == null) {
-          // Wait a little bit before hitting the master.
-          return delayedSendRpcToTablet(request, null);
-        }
-
-        if (!newTabletClient.isAlive()) {
-          LOG.debug("Tried reconnecting to tablet server {} but failed, " +
-              "will query the master", tabletClient.getUuid());
-          // Let fall through.
-        } else {
-          request.setTablet(tablet);
-          newTabletClient.sendRpc(request);
+      String uuid = tablet.getLeaderUUID();
+      if (uuid != null) {
+        Deferred<R> d = request.getDeferred();
+        request.setTablet(tablet);
+        TabletClient client = connectionCache.getLiveClient(uuid);
+        if (client != null) {
+          client.sendRpc(request);
           return d;
         }
       }
@@ -924,7 +897,12 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   @VisibleForTesting
   List<TabletClient> getTabletClients() {
-    return connectionCache.getTabletClients();
+    return connectionCache.getImmutableTabletClientsList();
+  }
+
+  @VisibleForTesting
+  TabletClient getTabletClient(String uuid) {
+    return connectionCache.getClient(uuid);
   }
 
   /**
@@ -988,7 +966,7 @@ public class AsyncKuduClient implements AutoCloseable {
       // this will save us a Master lookup.
       TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
       if (entry != null && !entry.isNonCoveredRange()
-          && entry.getTablet().getLeaderConnection() != null) {
+          && entry.getTablet().getLeaderUUID() != null) {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
@@ -1158,7 +1136,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * a RPC, so we need to demote it and retry.
    */
   <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    rpc.getTablet().demoteLeader(server);
+    rpc.getTablet().demoteLeader(server.getUuid());
     handleRetryableError(rpc, ex);
   }
 
@@ -1204,8 +1182,8 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
     LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
-        tablet.getTabletIdAsString());
-    tablet.removeTabletClient(server);
+        tablet.getTabletId());
+    tablet.removeTabletClient(server.getUuid());
   }
 
   /** Callback executed when a master lookup completes.  */
@@ -1295,10 +1273,30 @@ public class AsyncKuduClient implements AutoCloseable {
     // already discovered the tablet, its locations are refreshed.
     List<RemoteTablet> tablets = new ArrayList<>(locations.size());
     for (Master.TabletLocationsPB tabletPb : locations) {
-      RemoteTablet rt = RemoteTablet.createTabletFromPb(tableId, tabletPb, connectionCache);
+
+      String tabletId = tabletPb.getTabletId().toStringUtf8();
+
+      List<UnknownHostException> lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount());
+      for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
+        try {
+          connectionCache.connectTS(replica.getTsInfo());
+        } catch (UnknownHostException ex) {
+          lookupExceptions.add(ex);
+        }
+      }
+
+      if (!lookupExceptions.isEmpty() &&
+          lookupExceptions.size() == tabletPb.getReplicasCount()) {
+        Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
+            lookupExceptions);
+        throw new NonRecoverableException(statusIOE);
+      }
+
+      Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
+      RemoteTablet rt = new RemoteTablet(tableId, tabletId, partition, tabletPb);
 
       LOG.info("Learned about tablet {} for table '{}' with partition {}",
-               rt.getTabletIdAsString(), tableName, rt.getPartition());
+               rt.getTabletId(), tableName, rt.getPartition());
       tablets.add(rt);
     }
 
@@ -1310,7 +1308,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
     // sleep before retrying.
     TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
-    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderConnection() == null) {
+    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
       throw new NoLeaderFoundException(
           Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index a0694a5..c61fbdb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -492,7 +492,7 @@ public final class AsyncKuduScanner {
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Done scanning tablet {} for partition {} with scanner id {}",
-                tablet.getTabletIdAsString(), tablet.getPartition(), Bytes.pretty(scannerId));
+                tablet.getTabletId(), tablet.getPartition(), Bytes.pretty(scannerId));
     }
     scannerId = null;
     sequenceId = 0;
@@ -537,7 +537,7 @@ public final class AsyncKuduScanner {
   }
 
   public String toString() {
-    final String tablet = this.tablet == null ? "null" : this.tablet.getTabletIdAsString();
+    final String tablet = this.tablet == null ? "null" : this.tablet.getTabletId();
     final StringBuilder buf = new StringBuilder();
     buf.append("KuduScanner(table=");
     buf.append(table.getName());
@@ -785,7 +785,7 @@ public final class AsyncKuduScanner {
 
     public String toString() {
       return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
-          + (tablet != null? ", tabletSlice=" + tablet.getTabletIdAsString() : "")
+          + (tablet != null? ", tabletSlice=" + tablet.getTabletId() : "")
           + ", attempt=" + attempt + ')';
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index a0dacdd..01bc113 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -154,7 +154,7 @@ class Batch extends KuduRpc<BatchResponse> {
 
   @Override
   void updateStatistics(Statistics statistics, BatchResponse response) {
-    Slice tabletId = this.getTablet().getTabletId();
+    String tabletId = this.getTablet().getTabletId();
     String tableName = this.getTable().getName();
     TabletStatistics tabletStatistics = statistics.getTabletStatistics(tableName, tabletId);
     if (response == null) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index f421d32..9a885fb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -16,12 +16,13 @@
 // under the License.
 package org.apache.kudu.client;
 
-import com.stumbleupon.async.Callback;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.Common;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelStateEvent;
+import org.apache.kudu.master.Master;
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.SocketChannelConfig;
@@ -32,20 +33,27 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.GuardedBy;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * The ConnectionCache is responsible for managing connections to masters and tablets. There
- * should only be one instance per Kudu client, and can <strong>not</strong> be shared between
+ * The ConnectionCache is responsible for managing connections to masters and tablet servers.
+ * There should only be one instance per Kudu client, and can <strong>not</strong> be shared between
  * clients.
  * <p>
+ * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by
+ * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues.
+ * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes
+ * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's
+ * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead
+ * connection prevents tight looping when hitting "Connection refused"-type of errors.
+ * <p>
  * This class is thread-safe.
  */
 @InterfaceAudience.Private
@@ -55,31 +63,20 @@ class ConnectionCache {
   private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
 
   /**
-   * Cache that maps a TabletServer address ("ip:port") to the clients
-   * connected to it.
-   * <p>
-   * Access to this map must be synchronized by locking its monitor.
-   * Logging the contents of this map (or calling toString) requires copying it first.
+   * Cache that maps UUIDs to the clients connected to them.
    * <p>
-   * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
-   * (just when connecting to / disconnecting from TabletClients) and when we
-   * add something to it, we want to do an atomic get-and-put, but
-   * {@code putIfAbsent} isn't a good fit for us since it requires to create
+   * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put,
+   * {@code putIfAbsent} isn't a good fit for us since it requires creating
    * an object that may be "wasted" in case another thread wins the insertion
    * race, and we don't want to create unnecessary connections.
-   * <p>
-   * Upon disconnection, clients are automatically removed from this map.
-   * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
-   * the clean-up on the {@code channelClosed} event, which is actually the
-   * 3rd and last event to be fired when a channel gets disconnected. The
-   * first one to get fired is, {@code channelDisconnected}. This matters to
-   * us because we want to purge disconnected clients from the cache as
-   * quickly as possible after the disconnection, to avoid handing out clients
-   * that are going to cause unnecessary errors.
-   * @see TabletClientPipeline#handleDisconnect
    */
-  @GuardedBy("ip2client")
-  private final HashMap<String, TabletClient> ip2client = new HashMap<>();
+  @GuardedBy("lock")
+  private final HashMap<String, TabletClient> uuid2client = new HashMap<>();
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private final Lock readLock = lock.readLock();
+  private final Lock writeLock = lock.readLock();
 
   private final AsyncKuduClient kuduClient;
 
@@ -91,21 +88,48 @@ class ConnectionCache {
     this.kuduClient = client;
   }
 
+  /**
+   * Create a connection to a tablet server based on information provided by the master.
+   * @param tsInfoPB master-provided information for the tablet server
+   * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+   */
+  void connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
+    List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
+    String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
+    if (addresses.isEmpty()) {
+      LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
+      return;
+    }
+
+    // from meta_cache.cc
+    // TODO: if the TS advertises multiple host/ports, pick the right one
+    // based on some kind of policy. For now just use the first always.
+    String ip = getIP(addresses.get(0).getHost());
+    if (ip == null) {
+      throw new UnknownHostException(
+          "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
+    }
+    newClient(uuid, ip, addresses.get(0).getPort());
+  }
+
   TabletClient newClient(String uuid, final String host, final int port) {
-    final String hostport = host + ':' + port;
     TabletClient client;
     SocketChannel chan;
-    synchronized (ip2client) {
-      client = ip2client.get(hostport);
+
+    writeLock.lock();
+    try {
+      client = uuid2client.get(uuid);
       if (client != null && client.isAlive()) {
         return client;
       }
       final TabletClientPipeline pipeline = new TabletClientPipeline();
       client = pipeline.init(uuid, host, port);
       chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
-      TabletClient oldClient = ip2client.put(hostport, client);
-      assert oldClient == null;
+      uuid2client.put(uuid, client);
+    } finally {
+      writeLock.unlock();
     }
+
     final SocketChannelConfig config = chan.getConfig();
     config.setConnectTimeoutMillis(5000);
     config.setTcpNoDelay(true);
@@ -118,152 +142,88 @@ class ConnectionCache {
   }
 
   /**
-   * Closes every socket, which will also cancel all the RPCs in flight.
+   * Get a connection to a server for the given UUID. The returned connection can be down and its
+   * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's
+   * gonna be re-connected automatically, use {@link #getLiveClient(String)}.
+   * @param uuid server's identifier
+   * @return a connection to a server, or null if the passed UUID isn't known
    */
-  Deferred<ArrayList<Void>> disconnectEverything() {
-    ArrayList<Deferred<Void>> deferreds = new ArrayList<>(2);
-    HashMap<String, TabletClient> ip2client_copy;
-    synchronized (ip2client) {
-      // Make a local copy so we can shutdown every Tablet Server clients
-      // without hold the lock while we iterate over the data structure.
-      ip2client_copy = new HashMap<>(ip2client);
-    }
-
-    for (TabletClient ts : ip2client_copy.values()) {
-      deferreds.add(ts.shutdown());
+  TabletClient getClient(String uuid) {
+    readLock.lock();
+    try {
+      return uuid2client.get(uuid);
+    } finally {
+      readLock.unlock();
     }
-    final int size = deferreds.size();
-    return Deferred.group(deferreds).addCallback(
-        new Callback<ArrayList<Void>, ArrayList<Void>>() {
-          public ArrayList<Void> call(final ArrayList<Void> arg) {
-            // Normally, now that we've shutdown() every client, all our caches should
-            // be empty since each shutdown() generates a DISCONNECTED event, which
-            // causes TabletClientPipeline to call removeClientFromIpCache().
-            HashMap<String, TabletClient> logme = null;
-            synchronized (ip2client) {
-              if (!ip2client.isEmpty()) {
-                logme = new HashMap<>(ip2client);
-              }
-            }
-            if (logme != null) {
-              // Putting this logging statement inside the synchronized block
-              // can lead to a deadlock, since HashMap.toString() is going to
-              // call TabletClient.toString() on each entry, and this locks the
-              // client briefly. Other parts of the code lock clients first and
-              // the ip2client HashMap second, so this can easily deadlock.
-              LOG.error("Some clients are left in the client cache and haven't"
-                  + " been cleaned up: " + logme);
-            }
-            return arg;
-          }
-
-          public String toString() {
-            return "wait " + size + " TabletClient.shutdown()";
-          }
-        });
   }
 
   /**
-   * Modifying the list returned by this method won't affect this cache,
-   * but calling certain methods on the returned TabletClients can. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return copy of the current TabletClients list
+   * Get a connection to a server for the given UUID. This method will automatically call
+   * {@link #newClient(String, String, int)} if the cached connection is down.
+   * @param uuid server's identifier
+   * @return a connection to a server, or null if the passed UUID isn't known
    */
-  List<TabletClient> getTabletClients() {
-    synchronized (ip2client) {
-      return new ArrayList<>(ip2client.values());
+  TabletClient getLiveClient(String uuid) {
+    TabletClient client = getClient(uuid);
+
+    if (client == null) {
+      return null;
+    } else if (client.isAlive()) {
+      return client;
+    } else {
+      return newClient(uuid, client.getHost(), client.getPort());
     }
   }
 
   /**
-   * Blocking call. Performs a slow search of the IP used by the given client.
-   * <p>
-   * This is needed when we're trying to find the IP of the client before its
-   * channel has successfully connected, because Netty's API offers no way of
-   * retrieving the IP of the remote peer until we're connected to it.
-   * @param client the client we want the IP of
-   * @return the IP of the client, or {@code null} if we couldn't find it
+   * Asynchronously closes every socket, which will also cancel all the RPCs in flight.
    */
-  private InetSocketAddress slowSearchClientIP(final TabletClient client) {
-    String hostport = null;
-    synchronized (ip2client) {
-      for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
-        if (e.getValue() == client) {
-          hostport = e.getKey();
-          break;
-        }
-      }
-    }
-
-    if (hostport == null) {
-      HashMap<String, TabletClient> copy;
-      synchronized (ip2client) {
-        copy = new HashMap<>(ip2client);
+  Deferred<ArrayList<Void>> disconnectEverything() {
+    readLock.lock();
+    try {
+      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2client.size());
+      for (TabletClient ts : uuid2client.values()) {
+        deferreds.add(ts.shutdown());
       }
-      LOG.error("Couldn't find {} in {}", client, copy);
-      return null;
-    }
-    final int colon = hostport.indexOf(':', 1);
-    if (colon < 1) {
-      LOG.error("No `:' found in {}", hostport);
-      return null;
-    }
-    final String host = getIP(hostport.substring(0, colon));
-    if (host == null) {
-      // getIP will print the reason why, there's nothing else we can do.
-      return null;
+      return Deferred.group(deferreds);
+    } finally {
+      readLock.unlock();
     }
+  }
 
-    int port;
+  /**
+   * The list returned by this method can't be modified,
+   * but calling certain methods on the returned TabletClients can have an effect. For example,
+   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
+   * TabletClient#shutdown()}.
+   * @return copy of the current TabletClients list
+   */
+  List<TabletClient> getImmutableTabletClientsList() {
+    readLock.lock();
     try {
-      port = parsePortNumber(hostport.substring(colon + 1,
-          hostport.length()));
-    } catch (NumberFormatException e) {
-      LOG.error("Bad port in {}", hostport, e);
-      return null;
+      return ImmutableList.copyOf(uuid2client.values());
+    } finally {
+      readLock.unlock();
     }
-    return new InetSocketAddress(host, port);
   }
 
   /**
-   * Removes the given client from the `ip2client` cache.
-   * @param client the client for which we must clear the ip cache
-   * @param remote the address of the remote peer, if known, or null
+   * Queries all the cached connections if they are alive.
+   * @return true if all the connections are down, else false
    */
-  private void removeClientFromIpCache(final TabletClient client,
-                                       final SocketAddress remote) {
-
-    if (remote == null) {
-      return;  // Can't continue without knowing the remote address.
-    }
-
-    String hostport;
-    if (remote instanceof InetSocketAddress) {
-      final InetSocketAddress sock = (InetSocketAddress) remote;
-      final InetAddress addr = sock.getAddress();
-      if (addr == null) {
-        LOG.error("Unresolved IP for {}. This shouldn't happen.", remote);
-        return;
-      } else {
-        hostport = addr.getHostAddress() + ':' + sock.getPort();
+  @VisibleForTesting
+  boolean allConnectionsAreDead() {
+    readLock.lock();
+    try {
+      for (TabletClient tserver : uuid2client.values()) {
+        if (tserver.isAlive()) {
+          return false;
+        }
       }
-    } else {
-      LOG.error("Found a non-InetSocketAddress remote: {}. This shouldn't happen.", remote);
-      return;
-    }
-
-    TabletClient old;
-    synchronized (ip2client) {
-      old = ip2client.remove(hostport);
-    }
-
-    LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
-    if (old == null) {
-      LOG.trace("When expiring {} from the client cache (host:port={}"
-          + "), it was found that there was no entry"
-          + " corresponding to {}. This shouldn't happen.", client, hostport, remote);
+    } finally {
+      readLock.unlock();
     }
+    return true;
   }
 
   /**
@@ -293,37 +253,8 @@ class ConnectionCache {
     }
   }
 
-  /**
-   * Parses a TCP port number from a string.
-   * @param portnum the string to parse
-   * @return a strictly positive, validated port number
-   * @throws NumberFormatException if the string couldn't be parsed as an
-   * integer or if the value was outside of the range allowed for TCP ports
-   */
-  private static int parsePortNumber(final String portnum)
-      throws NumberFormatException {
-    final int port = Integer.parseInt(portnum);
-    if (port <= 0 || port > 65535) {
-      throw new NumberFormatException(port == 0 ? "port is zero" :
-          (port < 0 ? "port is negative: "
-              : "port is too large: ") + port);
-    }
-    return port;
-  }
-
   private final class TabletClientPipeline extends DefaultChannelPipeline {
 
-    private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class);
-    /**
-     * Have we already disconnected?.
-     * We use this to avoid doing the cleanup work for the same client more
-     * than once, even if we get multiple events indicating that the client
-     * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
-     * No synchronization needed as this is always accessed from only one
-     * thread at a time (equivalent to a non-shared state in a Netty handler).
-     */
-    private boolean disconnected = false;
-
     TabletClient init(String uuid, String host, int port) {
       AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
       final TabletClient client = new TabletClient(kuduClient, uuid, host, port);
@@ -337,59 +268,5 @@ class ConnectionCache {
 
       return client;
     }
-
-    @Override
-    public void sendDownstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendDownstream(event);
-    }
-
-    @Override
-    public void sendUpstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendUpstream(event);
-    }
-
-    private void handleDisconnect(final ChannelStateEvent state_event) {
-      if (disconnected) {
-        return;
-      }
-      switch (state_event.getState()) {
-        case OPEN:
-          if (state_event.getValue() == Boolean.FALSE) {
-            break;  // CLOSED
-          }
-          return;
-        case CONNECTED:
-          if (state_event.getValue() == null) {
-            break;  // DISCONNECTED
-          }
-          return;
-        default:
-          return;  // Not an event we're interested in, ignore it.
-      }
-
-      disconnected = true;  // So we don't clean up the same client twice.
-      try {
-        final TabletClient client = super.get(TabletClient.class);
-        SocketAddress remote = super.getChannel().getRemoteAddress();
-        // At this point Netty gives us no easy way to access the
-        // SocketAddress of the peer we tried to connect to. This
-        // kinda sucks but I couldn't find an easier way.
-        if (remote == null) {
-          remote = slowSearchClientIP(client);
-        }
-
-        synchronized (client) {
-          removeClientFromIpCache(client, remote);
-        }
-      } catch (Exception e) {
-        log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 453ce72..02b547e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -276,7 +276,7 @@ public abstract class KuduRpc<R> {
     if (tablet == null) {
       buf.append("null");
     } else {
-      buf.append(tablet.getTabletIdAsString());
+      buf.append(tablet.getTabletId());
     }
     buf.append(", attempt=").append(attempt);
     buf.append(", ").append(deadlineTracker);

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index baf1cc6..f0bd71a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -176,7 +176,7 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
 
   @Override
   void updateStatistics(Statistics statistics, OperationResponse response) {
-    Slice tabletId = this.getTablet().getTabletId();
+    String tabletId = this.getTablet().getTabletId();
     String tableName = this.getTable().getName();
     TabletStatistics tabletStatistics = statistics.getTabletStatistics(tableName, tabletId);
     if (response == null) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
new file mode 100644
index 0000000..31a3f2f
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.protobuf.Message;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Ping request only used for tests to test connections.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class PingRequest extends KuduRpc<PingResponse> {
+
+  private final String serviceName;
+
+  static PingRequest makeMasterPingRequest() {
+    return new PingRequest(MASTER_SERVICE_NAME);
+  }
+
+  static PingRequest makeTabletServerPingRequest() {
+    return new PingRequest(TABLET_SERVER_SERVICE_NAME);
+  }
+
+  private PingRequest(String serviceName) {
+    super(null);
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    assert header.isInitialized();
+    final Master.PingRequestPB.Builder builder =
+        Master.PingRequestPB.newBuilder();
+    return toChannelBuffer(header, builder.build());
+  }
+
+  @Override
+  String serviceName() {
+    return serviceName;
+  }
+
+  @Override
+  String method() {
+    return "Ping";
+  }
+
+  @Override
+  Pair<PingResponse, Object> deserialize(CallResponse callResponse, String tsUUID) throws Exception {
+    final Master.PingResponsePB.Builder respBuilder =
+        Master.PingResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    PingResponse response = new PingResponse(deadlineTracker.getElapsedMillis(),
+        tsUUID);
+    return new Pair<>(response, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/PingResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingResponse.java
new file mode 100644
index 0000000..83d08a7
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingResponse.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class PingResponse extends KuduRpcResponse {
+
+  /**
+   * Constructor with information common to all RPCs.
+   *
+   * @param elapsedMillis time in milliseconds since RPC creation to now
+   * @param tsUUID        a string that contains the UUID of the server that answered the RPC
+   */
+  PingResponse(long elapsedMillis, String tsUUID) {
+    super(elapsedMillis, tsUUID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 7c4a5a2..07936c8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -19,38 +19,30 @@ package org.apache.kudu.client;
 import com.google.common.base.Objects;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
-import org.apache.kudu.Common;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.consensus.Metadata;
 import org.apache.kudu.master.Master;
-import org.apache.kudu.util.Slice;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This class encapsulates the information regarding a tablet and its locations.
  * <p>
  * RemoteTablet's main function, once it is init()'d, is to keep track of where the leader for this
- * tablet is. For example, an RPC might call {@link #getLeaderConnection()}, contact that TS, find
- * it's not the leader anymore, and then call {@link #demoteLeader(TabletClient)}.
+ * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
+ * it's not the leader anymore, and then call {@link #demoteLeader(String)}.
  * <p>
  * A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
  * and short when they do since the Kudu client will replace the RemoteTablet it caches with new
  * ones after getting tablet locations from the master.
- * <p>
- * One particularity this class handles is {@link TabletClient} that disconnect due to their socket
- * read timeout being reached. Instead of removing them from {@link #tabletServers}, we instead
- * continue keeping track of them and if an RPC wants to use this tablet again, it'll notice that
- * the TabletClient returned by {@link #getLeaderConnection()} isn't alive, and will call
- * {@link #reconnectTabletClient(TabletClient)}.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -58,69 +50,33 @@ class RemoteTablet implements Comparable<RemoteTablet> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteTablet.class);
 
-  private static final int NO_LEADER_INDEX = -1;
-
   private final String tableId;
-  private final Slice tabletId;
+  private final String tabletId;
   @GuardedBy("tabletServers")
-  private final ArrayList<TabletClient> tabletServers = new ArrayList<>();
+  private final Set<String> tabletServers = new HashSet<>();
   private final AtomicReference<List<LocatedTablet.Replica>> replicas =
       new AtomicReference(ImmutableList.of());
   private final Partition partition;
-  private final ConnectionCache connectionCache;
 
   @GuardedBy("tabletServers")
-  private int leaderIndex = NO_LEADER_INDEX;
+  private String leaderUuid;
 
-  RemoteTablet(String tableId, Slice tabletId,
-               Partition partition, ConnectionCache connectionCache) {
+  RemoteTablet(String tableId, String tabletId,
+               Partition partition, Master.TabletLocationsPB tabletLocations) {
     this.tabletId = tabletId;
     this.tableId = tableId;
     this.partition = partition;
-    this.connectionCache = connectionCache;
-  }
 
-  void init(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
-
-    synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
-      tabletServers.clear();
-      leaderIndex = NO_LEADER_INDEX;
-      List<UnknownHostException> lookupExceptions =
-          new ArrayList<>(tabletLocations.getReplicasCount());
-      for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
-
-        List<Common.HostPortPB> addresses = replica.getTsInfo().getRpcAddressesList();
-        if (addresses.isEmpty()) {
-          LOG.warn("Tablet server for tablet " + getTabletIdAsString() + " doesn't have any " +
-              "address");
-          continue;
-        }
-        byte[] buf = Bytes.get(replica.getTsInfo().getPermanentUuid());
-        String uuid = Bytes.getString(buf);
-        // from meta_cache.cc
-        // TODO: if the TS advertises multiple host/ports, pick the right one
-        // based on some kind of policy. For now just use the first always.
-        try {
-          addTabletClient(uuid, addresses.get(0).getHost(), addresses.get(0).getPort(),
-              replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER));
-        } catch (UnknownHostException ex) {
-          lookupExceptions.add(ex);
-        }
-      }
-
-      if (leaderIndex == NO_LEADER_INDEX) {
-        LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
-      }
-
-      // If we found a tablet that doesn't contain a single location that we can resolve, there's
-      // no point in retrying.
-      if (!lookupExceptions.isEmpty() &&
-          lookupExceptions.size() == tabletLocations.getReplicasCount()) {
-        Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
-            lookupExceptions);
-        throw new NonRecoverableException(statusIOE);
+    for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
+      String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8();
+      tabletServers.add(uuid);
+      if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
+        leaderUuid = uuid;
       }
+    }
 
+    if (leaderUuid == null) {
+      LOG.warn("No leader provided for tablet {}", getTabletId());
     }
 
     ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
@@ -130,130 +86,61 @@ class RemoteTablet implements Comparable<RemoteTablet> {
     replicas.set(replicasBuilder.build());
   }
 
-  @GuardedBy("tabletServers")
-  private void addTabletClient(String uuid, String host, int port, boolean isLeader)
-      throws UnknownHostException {
-    String ip = ConnectionCache.getIP(host);
-    if (ip == null) {
-      throw new UnknownHostException("Failed to resolve the IP of `" + host + "'");
-    }
-    TabletClient client = connectionCache.newClient(uuid, ip, port);
-
-    tabletServers.add(client);
-    if (isLeader) {
-      leaderIndex = tabletServers.size() - 1;
-    }
-  }
-
-  /**
-   * Call this method when an existing TabletClient in this tablet's cache is found to be dead.
-   * It removes the passed TS from this tablet's cache and replaces it with a new instance of
-   * TabletClient. It will keep its leader status if it was already considered a leader.
-   * If the passed TabletClient was already removed, then this is a no-op.
-   * @param staleTs TS to reconnect to
-   * @throws UnknownHostException if we can't resolve server's hostname
-   */
-  void reconnectTabletClient(TabletClient staleTs) throws UnknownHostException {
-    assert (!staleTs.isAlive());
-
-    synchronized (tabletServers) {
-      int index = tabletServers.indexOf(staleTs);
-
-      if (index == -1) {
-        // Another thread already took care of it.
-        return;
-      }
-
-      boolean wasLeader = index == leaderIndex;
-
-      LOG.debug("Reconnecting to server {} for tablet {}. Was a leader? {}",
-          staleTs.getUuid(), getTabletIdAsString(), wasLeader);
-
-      boolean removed = removeTabletClient(staleTs);
-
-      if (!removed) {
-        LOG.debug("{} was already removed from tablet {}'s cache when reconnecting to it",
-            staleTs.getUuid(), getTabletIdAsString());
-      }
-
-      addTabletClient(staleTs.getUuid(), staleTs.getHost(),
-          staleTs.getPort(), wasLeader);
-    }
-  }
-
   @Override
   public String toString() {
-    return getTabletIdAsString();
+    return getTabletId();
   }
 
   /**
-   * Removes the passed TabletClient from this tablet's list of tablet servers. If it was the
-   * leader, then we "promote" the next one unless it was the last one in the list.
-   * @param ts a TabletClient that was disconnected
+   * Removes the passed tablet server from this tablet's list of tablet servers.
+   * @param uuid a tablet server to remove from this cache
    * @return true if this method removed ts from the list, else false
    */
-  boolean removeTabletClient(TabletClient ts) {
+  boolean removeTabletClient(String uuid) {
     synchronized (tabletServers) {
-      // TODO unit test for this once we have the infra
-      int index = tabletServers.indexOf(ts);
-      if (index == -1) {
-        return false; // we removed it already
+      if (leaderUuid != null && leaderUuid.equals(uuid)) {
+        leaderUuid = null;
       }
-
-      tabletServers.remove(index);
-      if (leaderIndex == index && leaderIndex == tabletServers.size()) {
-        leaderIndex = NO_LEADER_INDEX;
-      } else if (leaderIndex > index) {
-        leaderIndex--; // leader moved down the list
+      if (tabletServers.remove(uuid)) {
+        return true;
       }
-
-      return true;
-      // TODO if we reach 0 TS, maybe we should remove ourselves?
+      LOG.debug("tablet {} already removed ts {}, size left is {}",
+          getTabletId(), uuid, tabletServers.size());
+      return false;
     }
   }
 
   /**
-   * Clears the leader index if the passed tablet server is the current leader.
+   * Clears the leader UUID if the passed tablet server is the current leader.
    * If it is the current leader, then the next call to this tablet will have
    * to query the master to find the new leader.
-   * @param ts a TabletClient that gave a sign that it isn't this tablet's leader
+   * @param uuid a tablet server that gave a sign that it isn't this tablet's leader
    */
-  void demoteLeader(TabletClient ts) {
+  void demoteLeader(String uuid) {
     synchronized (tabletServers) {
-      int index = tabletServers.indexOf(ts);
-      // If this TS was removed or we're already forcing a call to the master (meaning someone
-      // else beat us to it), then we just noop.
-      if (index == -1 || leaderIndex == NO_LEADER_INDEX) {
-        LOG.debug("{} couldn't be demoted as the leader for {}",
-            ts.getUuid(), getTabletIdAsString());
+      if (leaderUuid == null) {
+        LOG.debug("{} couldn't be demoted as the leader for {}, there is no known leader",
+            uuid, getTabletId());
         return;
       }
 
-      if (leaderIndex == index) {
-        leaderIndex = NO_LEADER_INDEX;
-        LOG.debug("{} was demoted as the leader for {}", ts.getUuid(), getTabletIdAsString());
+      if (leaderUuid.equals(uuid)) {
+        leaderUuid = null;
+        LOG.debug("{} was demoted as the leader for {}", uuid, getTabletId());
       } else {
-        LOG.debug("{} wasn't the leader for {}, current leader is at index {}", ts.getUuid(),
-            getTabletIdAsString(), leaderIndex);
+        LOG.debug("{} wasn't the leader for {}, current leader is {}", uuid,
+            getTabletId(), leaderUuid);
       }
     }
   }
 
   /**
-   * Get the connection to the tablet server that we think holds the leader replica for this tablet.
-   * @return a TabletClient that we think has the leader, else null
+   * Get the UUID of the tablet server that we think holds the leader replica for this tablet.
+   * @return a UUID of a tablet server that we think has the leader, else null
    */
-  TabletClient getLeaderConnection() {
+  String getLeaderUUID() {
     synchronized (tabletServers) {
-      if (tabletServers.isEmpty()) {
-        return null;
-      }
-      if (leaderIndex == RemoteTablet.NO_LEADER_INDEX) {
-        return null;
-      } else {
-        // and some reads.
-        return tabletServers.get(leaderIndex);
-      }
+      return leaderUuid;
     }
   }
 
@@ -269,7 +156,7 @@ class RemoteTablet implements Comparable<RemoteTablet> {
     return tableId;
   }
 
-  Slice getTabletId() {
+  String getTabletId() {
     return tabletId;
   }
 
@@ -281,10 +168,6 @@ class RemoteTablet implements Comparable<RemoteTablet> {
     return tabletId.getBytes();
   }
 
-  String getTabletIdAsString() {
-    return tabletId.toString(Charset.defaultCharset());
-  }
-
   @Override
   public int compareTo(RemoteTablet remoteTablet) {
     if (remoteTablet == null) {
@@ -310,15 +193,4 @@ class RemoteTablet implements Comparable<RemoteTablet> {
   public int hashCode() {
     return Objects.hashCode(tableId, partition);
   }
-
-  static RemoteTablet createTabletFromPb(String tableId,
-                                         Master.TabletLocationsPB tabletPb,
-                                         ConnectionCache connectionCache)
-      throws NonRecoverableException {
-    Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
-    Slice tabletId = new Slice(tabletPb.getTabletId().toByteArray());
-    RemoteTablet tablet = new RemoteTablet(tableId, tabletId, partition, connectionCache);
-    tablet.init(tabletPb);
-    return tablet;
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/Statistics.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Statistics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Statistics.java
index 18a9ff8..da09b5e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Statistics.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Statistics.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class Statistics {
-  private final ConcurrentHashMap<Slice, Statistics.TabletStatistics> stsMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Statistics.TabletStatistics> stsMap = new ConcurrentHashMap<>();
 
   /**
    * The statistic enum to pass when querying.
@@ -146,8 +146,8 @@ public class Statistics {
    */
   public Set<String> getTabletSet() {
     Set<String> tablets = Sets.newHashSet();
-    for (Slice tablet : stsMap.keySet()) {
-      tablets.add(tablet.toString(Charset.defaultCharset()));
+    for (String tablet : stsMap.keySet()) {
+      tablets.add(tablet);
     }
     return tablets;
   }
@@ -187,11 +187,11 @@ public class Statistics {
    * @param tabletId the tablet's id
    * @return a TabletStatistics object
    */
-  Statistics.TabletStatistics getTabletStatistics(String tableName, Slice tabletId) {
+  Statistics.TabletStatistics getTabletStatistics(String tableName, String tabletId) {
     Statistics.TabletStatistics tabletStats = stsMap.get(tabletId);
     if (tabletStats == null) {
       Statistics.TabletStatistics newTabletStats = new Statistics.TabletStatistics(tableName,
-          tabletId.toString(Charset.defaultCharset()));
+          tabletId);
       tabletStats = stsMap.putIfAbsent(tabletId, newTabletStats);
       if (tabletStats == null) {
         tabletStats = newTabletStats;

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index 18125b3..97c68e7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -286,7 +286,7 @@ class TableLocationsCache {
         return MoreObjects.toStringHelper("Tablet")
                           .add("lowerBoundPartitionKey", Bytes.hex(getLowerBoundPartitionKey()))
                           .add("upperBoundPartitionKey", Bytes.hex(getUpperBoundPartitionKey()))
-                          .add("tablet-id", tablet.getTabletIdAsString())
+                          .add("tablet-id", tablet.getTabletId())
                           .add("ttl", ttl())
                           .toString();
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index c91a48e..f5bb44c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -620,23 +620,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   /**
    * Tells whether or not this handler should be used.
    * <p>
-   * This method is not synchronized.  You need to synchronize on this
-   * instance if you need a memory visibility guarantee.  You may not need
-   * this guarantee if you're OK with the RPC finding out that the connection
-   * has been reset "the hard way" and you can retry the RPC.  In this case,
-   * you can call this method as a hint.  After getting the initial exception
-   * back, this thread is guaranteed to see this method return {@code false}
-   * without synchronization needed.
-   * @return {@code false} if this handler is known to have been disconnected
-   * from the server and sending an RPC (via {@link #sendRpc} or any other
-   * indirect means such as {@code GetTableLocations()}) will fail immediately
-   * by having the RPC's {@link Deferred} called back immediately with a
-   * {@link RecoverableException}.  This typically means that you got a
-   * stale reference (or that the reference to this instance is just about to
-   * be invalidated) and that you shouldn't use this instance.
+   * @return true if this instance can be used, else false if this handler is known to have been
+   * disconnected from the server and sending an RPC (via {@link #sendRpc(KuduRpc)}) will be
+   * retried in the client right away
    */
   public boolean isAlive() {
-    return !dead;
+    synchronized (this) {
+      return !dead;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index a91124d..ed6d21a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kudu.util.Pair;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.kudu.Common;
@@ -95,14 +96,26 @@ public class TestAsyncKuduClient extends BaseKuduTest {
       tabletClient.disconnect();
     }
     Stopwatch sw = Stopwatch.createStarted();
+    boolean allDead = false;
     while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
+      boolean sleep = false;
       if (!client.getTabletClients().isEmpty()) {
+        for (TabletClient tserver : client.getTabletClients()) {
+          if (tserver.isAlive()) {
+            sleep = true;
+            break;
+          }
+        }
+
+      }
+      if (sleep) {
         Thread.sleep(50);
       } else {
+        allDead = true;
         break;
       }
     }
-    assertTrue(client.getTabletClients().isEmpty());
+    assertTrue(allDead);
   }
 
   @Test
@@ -129,17 +142,8 @@ public class TestAsyncKuduClient extends BaseKuduTest {
       partition.setPartitionKeyEnd(ByteString.copyFrom("b" + i, Charsets.UTF_8.name()));
       tabletPb.setPartition(partition);
       tabletPb.setTabletId(ByteString.copyFromUtf8("some id " + i));
-      Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
-      Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
-      hostBuilder.setHost(badHostname + i);
-      hostBuilder.setPort(i);
-      tsInfoBuilder.addRpcAddresses(hostBuilder);
-      tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8("some uuid"));
-      Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
-          Master.TabletLocationsPB.ReplicaPB.newBuilder();
-      replicaBuilder.setTsInfo(tsInfoBuilder);
-      replicaBuilder.setRole(Metadata.RaftPeerPB.Role.FOLLOWER);
-      tabletPb.addReplicas(replicaBuilder);
+      tabletPb.addReplicas(TestUtils.getFakeTabletReplicaPB(
+          "uuid", badHostname + i, i, Metadata.RaftPeerPB.Role.FOLLOWER));
       tabletLocations.add(tabletPb.build());
     }
 
@@ -171,24 +175,11 @@ public class TestAsyncKuduClient extends BaseKuduTest {
     // Fake a master lookup that only returns one follower for the tablet.
     List<Master.TabletLocationsPB> tabletLocations = new ArrayList<>();
     Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
-    Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
-    partition.setPartitionKeyStart(ByteString.EMPTY);
-    partition.setPartitionKeyEnd(ByteString.EMPTY);
-    tabletPb.setPartition(partition);
+    tabletPb.setPartition(TestUtils.getFakePartitionPB());
     tabletPb.setTabletId(ByteString.copyFrom(tablet.getTabletId()));
-    Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
-    Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
-    hostBuilder.setHost(leader.getRpcHost());
-    hostBuilder.setPort(leader.getRpcPort());
-    tsInfoBuilder.addRpcAddresses(hostBuilder);
-    tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8("some uuid"));
-    Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
-        Master.TabletLocationsPB.ReplicaPB.newBuilder();
-    replicaBuilder.setTsInfo(tsInfoBuilder);
-    replicaBuilder.setRole(Metadata.RaftPeerPB.Role.FOLLOWER); // This is a lie
-    tabletPb.addReplicas(replicaBuilder);
+    tabletPb.addReplicas(TestUtils.getFakeTabletReplicaPB(
+        "master", leader.getRpcHost(), leader.getRpcPort(), Metadata.RaftPeerPB.Role.FOLLOWER));
     tabletLocations.add(tabletPb.build());
-
     try {
       client.discoverTablets(table, new byte[0], tabletLocations, 1000);
       fail("discoverTablets should throw an exception if there's no leader");
@@ -196,4 +187,42 @@ public class TestAsyncKuduClient extends BaseKuduTest {
       // Expected.
     }
   }
+
+  @Test
+  public void testConnectionRefused() throws Exception {
+    CreateTableOptions options = getBasicCreateTableOptions();
+    KuduTable table = createTable(
+        "testConnectionRefused-" + System.currentTimeMillis(),
+        basicSchema,
+        options);
+
+    // Warm up the caches.
+    assertEquals(0, countRowsInScan(syncClient.newScannerBuilder(table).build()));
+
+    // Make it impossible to use Kudu.
+    killTabletServers();
+
+    // Create a scan with a short timeout.
+    KuduScanner scanner = syncClient.newScannerBuilder(table).scanRequestTimeout(1000).build();
+
+    // Check it fails.
+    try {
+      while (scanner.hasMoreRows()) {
+        scanner.nextRows();
+        fail("The scan should timeout");
+      }
+    } catch (NonRecoverableException ex) {
+      assertTrue(ex.getStatus().isTimedOut());
+    }
+
+    // Try the same thing with an insert.
+    KuduSession session = syncClient.newSession();
+    session.setTimeoutMillis(1000);
+    try {
+      session.apply(createBasicSchemaInsert(table, 1));
+      fail("The insert should timeout");
+    } catch (NonRecoverableException ex) {
+      assertTrue(ex.getStatus().isTimedOut());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index a044c5f..2a8a341 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -121,8 +121,8 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     session.apply(insert).join(DEFAULT_SLEEP);
     RemoteTablet rt =
         client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
-    String tabletId = rt.getTabletIdAsString();
-    TabletClient tc = rt.getLeaderConnection();
+    String tabletId = rt.getTabletId();
+    TabletClient tc = client.getTabletClient(rt.getLeaderUUID());
     try {
       // Delete table so we get table not found error.
       client.deleteTable(TABLE_NAME).join();

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
new file mode 100644
index 0000000..3776d50
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.net.HostAndPort;
+import com.stumbleupon.async.Deferred;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestConnectionCache {
+
+  @Test(timeout = 50000)
+  public void test() throws Exception {
+    try (MiniKuduCluster cluster =
+             new MiniKuduCluster.MiniKuduClusterBuilder().numMasters(3).build()) {
+
+      AsyncKuduClient client =
+          new AsyncKuduClient.AsyncKuduClientBuilder(cluster.getMasterAddresses()).build();
+
+      List<HostAndPort> addresses = cluster.getMasterHostPorts();
+
+      ConnectionCache cache = new ConnectionCache(client);
+      int i = 0;
+      for (HostAndPort hp : addresses) {
+        TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort());
+        // Ping the process so we go through the whole connection process.
+        pingConnection(conn);
+        i++;
+      }
+      assertEquals(3, cache.getImmutableTabletClientsList().size());
+      assertFalse(cache.allConnectionsAreDead());
+
+      TabletClient conn = cache.getClient("0");
+
+      // Kill the connection.
+      conn.shutdown().join();
+      waitForConnectionToDie(conn);
+      assertFalse(conn.isAlive());
+
+      // Make sure the cache also knows it's dead, but that not all the connections are.
+      assertFalse(cache.getClient("0").isAlive());
+      assertFalse(cache.allConnectionsAreDead());
+
+      // Test reconnecting with only the UUID.
+      TabletClient newConn = cache.getLiveClient("0");
+      assertFalse(conn == newConn);
+      pingConnection(newConn);
+
+      // Test disconnecting and make sure we cleaned up all the connections.
+      cache.disconnectEverything().join();
+      waitForConnectionToDie(cache.getClient("0"));
+      waitForConnectionToDie(cache.getClient("1"));
+      waitForConnectionToDie(cache.getClient("2"));
+      assertTrue(cache.allConnectionsAreDead());
+    }
+  }
+
+  private void waitForConnectionToDie(TabletClient conn) throws InterruptedException {
+    DeadlineTracker deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(5000);
+    while (conn.isAlive() && !deadlineTracker.timedOut()) {
+      Thread.sleep(250);
+    }
+  }
+
+  private void pingConnection(TabletClient conn) throws Exception {
+    PingRequest ping = PingRequest.makeMasterPingRequest();
+    Deferred<PingResponse> d = ping.getDeferred();
+    conn.sendRpc(ping);
+    d.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
new file mode 100644
index 0000000..5cf4de8
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestRemoteTablet {
+
+  @Test
+  public void testLeaderLastRemovedLast() {
+    RemoteTablet tablet = getTablet(2);
+
+    // Demote the wrong leader, no-op.
+    assertEquals("2", tablet.getLeaderUUID());
+    tablet.demoteLeader("1");
+    assertEquals("2", tablet.getLeaderUUID());
+
+    // Tablet at server 1 was deleted.
+    assertTrue(tablet.removeTabletClient("1"));
+    assertEquals("2", tablet.getLeaderUUID());
+
+    // Simulate another thread trying to remove 1.
+    assertFalse(tablet.removeTabletClient("1"));
+
+    // Tablet at server 0 was deleted.
+    assertTrue(tablet.removeTabletClient("0"));
+    assertEquals("2", tablet.getLeaderUUID());
+
+    // Leader was demoted.
+    tablet.demoteLeader("2");
+    assertEquals(null, tablet.getLeaderUUID());
+
+    // Simulate another thread doing the same.
+    tablet.demoteLeader("2");
+    assertEquals(null, tablet.getLeaderUUID());
+  }
+
+  @Test
+  public void testLeaderLastRemovedFirst() {
+    RemoteTablet tablet = getTablet(2);
+
+    // Test we can remove it.
+    assertTrue(tablet.removeTabletClient("2"));
+    assertEquals(null, tablet.getLeaderUUID());
+
+    // Test demoting it doesn't break anything.
+    tablet.demoteLeader("2");
+    assertEquals(null, tablet.getLeaderUUID());
+  }
+
+  @Test
+  public void testLeaderFirst() {
+    RemoteTablet tablet = getTablet(0);
+
+    // Test we can remove it.
+    assertTrue(tablet.removeTabletClient("0"));
+    assertEquals(null, tablet.getLeaderUUID());
+
+    // Test demoting it doesn't break anything.
+    tablet.demoteLeader("0");
+    assertEquals(null, tablet.getLeaderUUID());
+
+    // Test removing a server with no leader doesn't break.
+    assertTrue(tablet.removeTabletClient("2"));
+  }
+
+  private RemoteTablet getTablet(int leaderIndex) {
+    Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
+
+    tabletPb.setPartition(TestUtils.getFakePartitionPB());
+    tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet"));
+    for (int i = 0; i < 3; i++) {
+      tabletPb.addReplicas(TestUtils.getFakeTabletReplicaPB(
+          i + "", "host", i,
+          leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER));
+    }
+
+    return new RemoteTablet("fake table", "fake tablet", null, tabletPb.build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/65cb2edf/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index 2b733c7..6cdd23d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -19,7 +19,11 @@ package org.apache.kudu.client;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.sun.security.auth.module.UnixSystem;
+import org.apache.kudu.Common;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.management.VMManagement;
@@ -286,4 +290,38 @@ public class TestUtils {
     int pid = getPid();
     return "127." + ((pid & 0xff00) >> 8) + "." + (pid & 0xff) + ".1";
   }
+
+  /**
+   * Get a PartitionPB with empty start and end keys.
+   * @return a fake partition
+   */
+  static Common.PartitionPB.Builder getFakePartitionPB() {
+    Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
+    partition.setPartitionKeyStart(ByteString.EMPTY);
+    partition.setPartitionKeyEnd(ByteString.EMPTY);
+    return partition;
+  }
+
+  /**
+   * Create a ReplicaPB based on the passed information.
+   * @param uuid server's identifier
+   * @param host server's hostname
+   * @param port server's port
+   * @param role server's role in the configuration
+   * @return a fake ReplicaPB
+   */
+  static Master.TabletLocationsPB.ReplicaPB.Builder getFakeTabletReplicaPB(
+      String uuid, String host, int port, Metadata.RaftPeerPB.Role role) {
+    Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+    Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
+    hostBuilder.setHost(host);
+    hostBuilder.setPort(port);
+    tsInfoBuilder.addRpcAddresses(hostBuilder);
+    tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8(uuid));
+    Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+        Master.TabletLocationsPB.ReplicaPB.newBuilder();
+    replicaBuilder.setTsInfo(tsInfoBuilder);
+    replicaBuilder.setRole(role);
+    return replicaBuilder;
+  }
 }