You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/19 22:16:06 UTC

[3/7] kudu git commit: [java client] Extract RemoteTablet from AsyncKuduClient

[java client] Extract RemoteTablet from AsyncKuduClient

RemoteTablet is responsible for handling the Java client's view of where replicas
are for its tablet, and who the leader is. Extracting this bit of functionality is
important if we want to be able to unit test it without bringing up a whole client
and possibly a cluster.

This patch makes a minor attempt at simplifying the interfaces, with more work to
come. Likewise for unit tests.

Change-Id: I3d06a573e4307c76a7aebab05cd5238fb0d9a2c6
Reviewed-on: http://gerrit.cloudera.org:8080/4719
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/46d4d78b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/46d4d78b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/46d4d78b

Branch: refs/heads/master
Commit: 46d4d78bc63690d6ee7109d14d462f76d05fb4c5
Parents: bd56e02
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Oct 13 13:21:05 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Oct 19 15:53:48 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 317 +-----------------
 .../apache/kudu/client/AsyncKuduScanner.java    |   8 +-
 .../java/org/apache/kudu/client/KuduRpc.java    |   6 +-
 .../org/apache/kudu/client/LocatedTablet.java   |   2 +-
 .../org/apache/kudu/client/RemoteTablet.java    | 324 +++++++++++++++++++
 .../apache/kudu/client/TableLocationsCache.java |   1 -
 .../org/apache/kudu/client/TabletClient.java    |   2 +-
 .../kudu/client/TestAsyncKuduSession.java       |   3 +-
 8 files changed, 345 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 9b5ba5b..1e4cdbf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,27 +27,21 @@
 package org.apache.kudu.client;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
-import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
-import org.apache.kudu.consensus.Metadata;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
 import org.apache.kudu.util.AsyncUtil;
 import org.apache.kudu.util.NetUtil;
 import org.apache.kudu.util.Pair;
-import org.apache.kudu.util.Slice;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -60,7 +54,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,7 +66,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
@@ -592,7 +584,8 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
     final RemoteTablet tablet = scanner.currentTablet();
-    final TabletClient client = clientFor(tablet);
+    assert (tablet != null);
+    final TabletClient client = tablet.getLeaderConnection();
     final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
     final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
     // Important to increment the attempts before the next if statement since
@@ -621,7 +614,7 @@ public class AsyncKuduClient implements AutoCloseable {
       return Deferred.fromResult(null);
     }
 
-    final TabletClient client = clientFor(tablet);
+    final TabletClient client = tablet.getLeaderConnection();
     if (client == null || !client.isAlive()) {
       // Oops, we couldn't find a tablet server that hosts this tablet. Our
       // cache was probably invalidated while the client was scanning. So
@@ -679,7 +672,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // block that queries the master.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      TabletClient tabletClient = clientFor(tablet);
+      TabletClient tabletClient = tablet.getLeaderConnection();
       if (tabletClient != null) {
         final Deferred<R> d = request.getDeferred();
         if (tabletClient.isAlive()) {
@@ -692,10 +685,10 @@ public class AsyncKuduClient implements AutoCloseable {
         } catch (UnknownHostException e) {
           LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
               tabletClient.getUuid(), e);
-          // Because of this exception, clientFor() below won't be able to find a newTabletClient
+          // Because of this exception, getLeaderConnection() below won't be able to find a newTabletClient
           // and we'll delay the RPC.
         }
-        TabletClient newTabletClient = clientFor(tablet);
+        TabletClient newTabletClient = tablet.getLeaderConnection();
         assert (tabletClient != newTabletClient);
 
         if (newTabletClient == null) {
@@ -947,28 +940,6 @@ public class AsyncKuduClient implements AutoCloseable {
     tableLocations.remove(tableId);
   }
 
-  TabletClient clientFor(RemoteTablet tablet) {
-    if (tablet == null) {
-      return null;
-    }
-
-    synchronized (tablet.tabletServers) {
-      if (tablet.tabletServers.isEmpty()) {
-        return null;
-      }
-      if (tablet.leaderIndex == RemoteTablet.NO_LEADER_INDEX) {
-        // TODO we don't know where the leader is, either because one wasn't provided or because
-        // we couldn't resolve its IP. We'll just send the client back so it retries and probably
-        // dies after too many attempts.
-        return null;
-      } else {
-        // TODO we currently always hit the leader, we probably don't need to except for writes
-        // and some reads.
-        return tablet.tabletServers.get(tablet.leaderIndex);
-      }
-    }
-  }
-
   /**
    * Checks whether or not an RPC can be retried once more
    * @param rpc The RPC we're going to attempt to execute
@@ -1016,7 +987,8 @@ public class AsyncKuduClient implements AutoCloseable {
       // looked up the tablet we're interested in.  Every once in a while
       // this will save us a Master lookup.
       TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
-      if (entry != null && !entry.isNonCoveredRange() && clientFor(entry.getTablet()) != null) {
+      if (entry != null && !entry.isNonCoveredRange()
+          && entry.getTablet().getLeaderConnection() != null) {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
@@ -1323,12 +1295,10 @@ public class AsyncKuduClient implements AutoCloseable {
     // already discovered the tablet, its locations are refreshed.
     List<RemoteTablet> tablets = new ArrayList<>(locations.size());
     for (Master.TabletLocationsPB tabletPb : locations) {
-      RemoteTablet rt = createTabletFromPb(tableId, tabletPb);
-      Slice tabletId = rt.tabletId;
+      RemoteTablet rt = RemoteTablet.createTabletFromPb(tableId, tabletPb, connectionCache);
 
       LOG.info("Learned about tablet {} for table '{}' with partition {}",
-               tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
-      rt.refreshTabletClients(tabletPb);
+               rt.getTabletIdAsString(), tableName, rt.getPartition());
       tablets.add(rt);
     }
 
@@ -1340,18 +1310,12 @@ public class AsyncKuduClient implements AutoCloseable {
     // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
     // sleep before retrying.
     TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
-    if (!entry.isNonCoveredRange() && clientFor(entry.getTablet()) == null) {
+    if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderConnection() == null) {
       throw new NoLeaderFoundException(
           Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
     }
   }
 
-  RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
-    Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
-    Slice tabletId = new Slice(tabletPb.getTabletId().toByteArray());
-    return new RemoteTablet(tableId, tabletId, partition);
-  }
-
   /**
    * Gets the tablet location cache entry for the tablet in the table covering a partition key.
    * @param tableId the table
@@ -1575,265 +1539,6 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * This class encapsulates the information regarding a tablet and its locations.
-   *
-   * Leader failover mechanism:
-   * When we get a complete peer list from the master, we place the leader in the first
-   * position of the tabletServers array. When we detect that it isn't the leader anymore (in
-   * TabletClient), we demote it and set the next TS in the array as the leader. When the RPC
-   * gets retried, it will use that TS since we always pick the leader.
-   *
-   * If that TS turns out to not be the leader, we will demote it and promote the next one, retry.
-   * When we hit the end of the list, we set the leaderIndex to NO_LEADER_INDEX which forces us
-   * to fetch the tablet locations from the master. We'll repeat this whole process until a RPC
-   * succeeds.
-   *
-   * Subtleties:
-   * We don't keep track of a TS after it disconnects (via removeTabletClient), so if we
-   * haven't contacted one for 10 seconds (socket timeout), it will be removed from the list of
-   * tabletServers. This means that if the leader fails, we only have one other TS to "promote"
-   * or maybe none at all. This is partly why we then set leaderIndex to NO_LEADER_INDEX.
-   *
-   * The effect of treating a TS as the new leader means that the Scanner will also try to hit it
-   * with requests. It's currently unclear if that's a good or a bad thing.
-   *
-   * Unlike the C++ client, we don't short-circuit the call to the master if it isn't available.
-   * This means that after trying all the peers to find the leader, we might get stuck waiting on
-   * a reachable master.
-   */
-  public class RemoteTablet implements Comparable<RemoteTablet> {
-
-    private static final int NO_LEADER_INDEX = -1;
-    private final String tableId;
-    private final Slice tabletId;
-    @GuardedBy("tabletServers")
-    private final ArrayList<TabletClient> tabletServers = new ArrayList<>();
-    private final AtomicReference<List<LocatedTablet.Replica>> replicas =
-        new AtomicReference(ImmutableList.of());
-    private final Partition partition;
-    private int leaderIndex = NO_LEADER_INDEX;
-
-    RemoteTablet(String tableId, Slice tabletId, Partition partition) {
-      this.tabletId = tabletId;
-      this.tableId = tableId;
-      this.partition = partition;
-    }
-
-    void refreshTabletClients(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
-
-      synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
-        tabletServers.clear();
-        leaderIndex = NO_LEADER_INDEX;
-        List<UnknownHostException> lookupExceptions =
-            new ArrayList<>(tabletLocations.getReplicasCount());
-        for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
-
-          List<Common.HostPortPB> addresses = replica.getTsInfo().getRpcAddressesList();
-          if (addresses.isEmpty()) {
-            LOG.warn("Tablet server for tablet " + getTabletIdAsString() + " doesn't have any " +
-                "address");
-            continue;
-          }
-          byte[] buf = Bytes.get(replica.getTsInfo().getPermanentUuid());
-          String uuid = Bytes.getString(buf);
-          // from meta_cache.cc
-          // TODO: if the TS advertises multiple host/ports, pick the right one
-          // based on some kind of policy. For now just use the first always.
-          try {
-            addTabletClient(uuid, addresses.get(0).getHost(), addresses.get(0).getPort(),
-                replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER));
-          } catch (UnknownHostException ex) {
-            lookupExceptions.add(ex);
-          }
-        }
-
-        if (leaderIndex == NO_LEADER_INDEX) {
-          LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
-        }
-
-        // If we found a tablet that doesn't contain a single location that we can resolve, there's
-        // no point in retrying.
-        if (!lookupExceptions.isEmpty() &&
-            lookupExceptions.size() == tabletLocations.getReplicasCount()) {
-          Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
-              lookupExceptions);
-          throw new NonRecoverableException(statusIOE);
-        }
-
-      }
-
-      ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
-      for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
-        replicasBuilder.add(new LocatedTablet.Replica(replica));
-      }
-      replicas.set(replicasBuilder.build());
-    }
-
-    @GuardedBy("tabletServers")
-    private void addTabletClient(String uuid, String host, int port, boolean isLeader)
-        throws UnknownHostException {
-      String ip = ConnectionCache.getIP(host);
-      if (ip == null) {
-        throw new UnknownHostException("Failed to resolve the IP of `" + host + "'");
-      }
-      TabletClient client = connectionCache.newClient(uuid, ip, port);
-
-      tabletServers.add(client);
-      if (isLeader) {
-        leaderIndex = tabletServers.size() - 1;
-      }
-    }
-
-    /**
-     * Call this method when an existing TabletClient in this tablet's cache is found to be dead.
-     * It removes the passed TS from this tablet's cache and replaces it with a new instance of
-     * TabletClient. It will keep its leader status if it was already considered a leader.
-     * If the passed TabletClient was already removed, then this is a no-op.
-     * @param staleTs TS to reconnect to
-     * @throws UnknownHostException if we can't resolve server's hostname
-     */
-    void reconnectTabletClient(TabletClient staleTs) throws UnknownHostException {
-      assert (!staleTs.isAlive());
-
-      synchronized (tabletServers) {
-        int index = tabletServers.indexOf(staleTs);
-
-        if (index == -1) {
-          // Another thread already took care of it.
-          return;
-        }
-
-        boolean wasLeader = index == leaderIndex;
-
-        LOG.debug("Reconnecting to server {} for tablet {}. Was a leader? {}",
-            staleTs.getUuid(), getTabletIdAsString(), wasLeader);
-
-        boolean removed = removeTabletClient(staleTs);
-
-        if (!removed) {
-          LOG.debug("{} was already removed from tablet {}'s cache when reconnecting to it",
-              staleTs.getUuid(), getTabletIdAsString());
-        }
-
-        addTabletClient(staleTs.getUuid(), staleTs.getHost(),
-            staleTs.getPort(), wasLeader);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return getTabletIdAsString();
-    }
-
-    /**
-     * Removes the passed TabletClient from this tablet's list of tablet servers. If it was the
-     * leader, then we "promote" the next one unless it was the last one in the list.
-     * @param ts A TabletClient that was disconnected.
-     * @return True if this method removed ts from the list, else false.
-     */
-    boolean removeTabletClient(TabletClient ts) {
-      synchronized (tabletServers) {
-        // TODO unit test for this once we have the infra
-        int index = tabletServers.indexOf(ts);
-        if (index == -1) {
-          return false; // we removed it already
-        }
-
-        tabletServers.remove(index);
-        if (leaderIndex == index && leaderIndex == tabletServers.size()) {
-          leaderIndex = NO_LEADER_INDEX;
-        } else if (leaderIndex > index) {
-          leaderIndex--; // leader moved down the list
-        }
-
-        return true;
-        // TODO if we reach 0 TS, maybe we should remove ourselves?
-      }
-    }
-
-    /**
-     * Clears the leader index if the passed tablet server is the current leader.
-     * If it is the current leader, then the next call to this tablet will have
-     * to query the master to find the new leader.
-     * @param ts a TabletClient that gave a sign that it isn't this tablet's leader
-     */
-    void demoteLeader(TabletClient ts) {
-      synchronized (tabletServers) {
-        int index = tabletServers.indexOf(ts);
-        // If this TS was removed or we're already forcing a call to the master (meaning someone
-        // else beat us to it), then we just noop.
-        if (index == -1 || leaderIndex == NO_LEADER_INDEX) {
-          LOG.debug("{} couldn't be demoted as the leader for {}",
-              ts.getUuid(), getTabletIdAsString());
-          return;
-        }
-
-        if (leaderIndex == index) {
-          leaderIndex = NO_LEADER_INDEX;
-          LOG.debug("{} was demoted as the leader for {}", ts.getUuid(), getTabletIdAsString());
-        } else {
-          LOG.debug("{} wasn't the leader for {}, current leader is at index {}", ts.getUuid(),
-              getTabletIdAsString(), leaderIndex);
-        }
-      }
-    }
-
-    /**
-     * Gets the replicas of this tablet. The returned list may not be mutated.
-     * @return the replicas of the tablet
-     */
-    List<LocatedTablet.Replica> getReplicas() {
-      return replicas.get();
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-
-    Slice getTabletId() {
-      return tabletId;
-    }
-
-    public Partition getPartition() {
-      return partition;
-    }
-
-    byte[] getTabletIdAsBytes() {
-      return tabletId.getBytes();
-    }
-
-    String getTabletIdAsString() {
-      return tabletId.toString(Charset.defaultCharset());
-    }
-
-    @Override
-    public int compareTo(RemoteTablet remoteTablet) {
-      if (remoteTablet == null) {
-        return 1;
-      }
-
-      return ComparisonChain.start()
-          .compare(this.tableId, remoteTablet.tableId)
-          .compare(this.partition, remoteTablet.partition).result();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      RemoteTablet that = (RemoteTablet) o;
-
-      return this.compareTo(that) == 0;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(tableId, partition);
-    }
-  }
-
-  /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 55a9fbb..a0694a5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -192,7 +192,7 @@ public final class AsyncKuduScanner {
    * If == DONE, then we're done scanning.
    * Otherwise it contains a proper tabletSlice name, and we're currently scanning.
    */
-  private AsyncKuduClient.RemoteTablet tablet;
+  private RemoteTablet tablet;
 
   /**
    * This is the scanner ID we got from the TabletServer.
@@ -469,7 +469,7 @@ public final class AsyncKuduScanner {
   private final Callback<Exception, Exception> nextRowErrback() {
     return new Callback<Exception, Exception>() {
       public Exception call(final Exception error) {
-        final AsyncKuduClient.RemoteTablet old_tablet = tablet;  // Save before invalidate().
+        final RemoteTablet old_tablet = tablet;  // Save before invalidate().
         String message = old_tablet + " pretends to not know " + AsyncKuduScanner.this;
         LOG.warn(message, error);
         invalidate();  // If there was an error, don't assume we're still OK.
@@ -560,7 +560,7 @@ public final class AsyncKuduScanner {
    * Sets the name of the tabletSlice that's hosting {@code this.start_key}.
    * @param tablet The tabletSlice we're currently supposed to be scanning.
    */
-  void setTablet(final AsyncKuduClient.RemoteTablet tablet) {
+  void setTablet(final RemoteTablet tablet) {
     this.tablet = tablet;
   }
 
@@ -577,7 +577,7 @@ public final class AsyncKuduScanner {
   /**
    * Returns the tabletSlice currently being scanned, if any.
    */
-  AsyncKuduClient.RemoteTablet currentTablet() {
+  RemoteTablet currentTablet() {
     return tablet;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index df46815..453ce72 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -87,7 +87,7 @@ public abstract class KuduRpc<R> {
    */
   private Deferred<R> deferred;
 
-  private AsyncKuduClient.RemoteTablet tablet;
+  private RemoteTablet tablet;
 
   final KuduTable table;
 
@@ -233,11 +233,11 @@ public abstract class KuduRpc<R> {
     return deferred;
   }
 
-  AsyncKuduClient.RemoteTablet getTablet() {
+  RemoteTablet getTablet() {
     return this.tablet;
   }
 
-  void setTablet(AsyncKuduClient.RemoteTablet tablet) {
+  void setTablet(RemoteTablet tablet) {
     this.tablet = tablet;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
index 3117f64..febd167 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/LocatedTablet.java
@@ -39,7 +39,7 @@ public class LocatedTablet {
 
   private final List<Replica> replicas;
 
-  LocatedTablet(AsyncKuduClient.RemoteTablet tablet) {
+  LocatedTablet(RemoteTablet tablet) {
     partition = tablet.getPartition();
     tabletId = tablet.getTabletIdAsBytes();
     replicas = tablet.getReplicas();

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
new file mode 100644
index 0000000..7c4a5a2
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import org.apache.kudu.Common;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.Slice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class encapsulates the information regarding a tablet and its locations.
+ * <p>
+ * RemoteTablet's main function, once it is init()'d, is to keep track of where the leader for this
+ * tablet is. For example, an RPC might call {@link #getLeaderConnection()}, contact that TS, find
+ * it's not the leader anymore, and then call {@link #demoteLeader(TabletClient)}.
+ * <p>
+ * A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
+ * and short when they do since the Kudu client will replace the RemoteTablet it caches with new
+ * ones after getting tablet locations from the master.
+ * <p>
+ * One particularity this class handles is {@link TabletClient} that disconnect due to their socket
+ * read timeout being reached. Instead of removing them from {@link #tabletServers}, we instead
+ * continue keeping track of them and if an RPC wants to use this tablet again, it'll notice that
+ * the TabletClient returned by {@link #getLeaderConnection()} isn't alive, and will call
+ * {@link #reconnectTabletClient(TabletClient)}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class RemoteTablet implements Comparable<RemoteTablet> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteTablet.class);
+
+  private static final int NO_LEADER_INDEX = -1;
+
+  private final String tableId;
+  private final Slice tabletId;
+  @GuardedBy("tabletServers")
+  private final ArrayList<TabletClient> tabletServers = new ArrayList<>();
+  private final AtomicReference<List<LocatedTablet.Replica>> replicas =
+      new AtomicReference(ImmutableList.of());
+  private final Partition partition;
+  private final ConnectionCache connectionCache;
+
+  @GuardedBy("tabletServers")
+  private int leaderIndex = NO_LEADER_INDEX;
+
+  RemoteTablet(String tableId, Slice tabletId,
+               Partition partition, ConnectionCache connectionCache) {
+    this.tabletId = tabletId;
+    this.tableId = tableId;
+    this.partition = partition;
+    this.connectionCache = connectionCache;
+  }
+
+  void init(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
+
+    synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
+      tabletServers.clear();
+      leaderIndex = NO_LEADER_INDEX;
+      List<UnknownHostException> lookupExceptions =
+          new ArrayList<>(tabletLocations.getReplicasCount());
+      for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
+
+        List<Common.HostPortPB> addresses = replica.getTsInfo().getRpcAddressesList();
+        if (addresses.isEmpty()) {
+          LOG.warn("Tablet server for tablet " + getTabletIdAsString() + " doesn't have any " +
+              "address");
+          continue;
+        }
+        byte[] buf = Bytes.get(replica.getTsInfo().getPermanentUuid());
+        String uuid = Bytes.getString(buf);
+        // from meta_cache.cc
+        // TODO: if the TS advertises multiple host/ports, pick the right one
+        // based on some kind of policy. For now just use the first always.
+        try {
+          addTabletClient(uuid, addresses.get(0).getHost(), addresses.get(0).getPort(),
+              replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER));
+        } catch (UnknownHostException ex) {
+          lookupExceptions.add(ex);
+        }
+      }
+
+      if (leaderIndex == NO_LEADER_INDEX) {
+        LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
+      }
+
+      // If we found a tablet that doesn't contain a single location that we can resolve, there's
+      // no point in retrying.
+      if (!lookupExceptions.isEmpty() &&
+          lookupExceptions.size() == tabletLocations.getReplicasCount()) {
+        Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
+            lookupExceptions);
+        throw new NonRecoverableException(statusIOE);
+      }
+
+    }
+
+    ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
+    for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
+      replicasBuilder.add(new LocatedTablet.Replica(replica));
+    }
+    replicas.set(replicasBuilder.build());
+  }
+
+  @GuardedBy("tabletServers")
+  private void addTabletClient(String uuid, String host, int port, boolean isLeader)
+      throws UnknownHostException {
+    String ip = ConnectionCache.getIP(host);
+    if (ip == null) {
+      throw new UnknownHostException("Failed to resolve the IP of `" + host + "'");
+    }
+    TabletClient client = connectionCache.newClient(uuid, ip, port);
+
+    tabletServers.add(client);
+    if (isLeader) {
+      leaderIndex = tabletServers.size() - 1;
+    }
+  }
+
+  /**
+   * Call this method when an existing TabletClient in this tablet's cache is found to be dead.
+   * It removes the passed TS from this tablet's cache and replaces it with a new instance of
+   * TabletClient. It will keep its leader status if it was already considered a leader.
+   * If the passed TabletClient was already removed, then this is a no-op.
+   * @param staleTs TS to reconnect to
+   * @throws UnknownHostException if we can't resolve server's hostname
+   */
+  void reconnectTabletClient(TabletClient staleTs) throws UnknownHostException {
+    assert (!staleTs.isAlive());
+
+    synchronized (tabletServers) {
+      int index = tabletServers.indexOf(staleTs);
+
+      if (index == -1) {
+        // Another thread already took care of it.
+        return;
+      }
+
+      boolean wasLeader = index == leaderIndex;
+
+      LOG.debug("Reconnecting to server {} for tablet {}. Was a leader? {}",
+          staleTs.getUuid(), getTabletIdAsString(), wasLeader);
+
+      boolean removed = removeTabletClient(staleTs);
+
+      if (!removed) {
+        LOG.debug("{} was already removed from tablet {}'s cache when reconnecting to it",
+            staleTs.getUuid(), getTabletIdAsString());
+      }
+
+      addTabletClient(staleTs.getUuid(), staleTs.getHost(),
+          staleTs.getPort(), wasLeader);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return getTabletIdAsString();
+  }
+
+  /**
+   * Removes the passed TabletClient from this tablet's list of tablet servers. If it was the
+   * leader, then we "promote" the next one unless it was the last one in the list.
+   * @param ts a TabletClient that was disconnected
+   * @return true if this method removed ts from the list, else false
+   */
+  boolean removeTabletClient(TabletClient ts) {
+    synchronized (tabletServers) {
+      // TODO unit test for this once we have the infra
+      int index = tabletServers.indexOf(ts);
+      if (index == -1) {
+        return false; // we removed it already
+      }
+
+      tabletServers.remove(index);
+      if (leaderIndex == index && leaderIndex == tabletServers.size()) {
+        leaderIndex = NO_LEADER_INDEX;
+      } else if (leaderIndex > index) {
+        leaderIndex--; // leader moved down the list
+      }
+
+      return true;
+      // TODO if we reach 0 TS, maybe we should remove ourselves?
+    }
+  }
+
+  /**
+   * Clears the leader index if the passed tablet server is the current leader.
+   * If it is the current leader, then the next call to this tablet will have
+   * to query the master to find the new leader.
+   * @param ts a TabletClient that gave a sign that it isn't this tablet's leader
+   */
+  void demoteLeader(TabletClient ts) {
+    synchronized (tabletServers) {
+      int index = tabletServers.indexOf(ts);
+      // If this TS was removed or we're already forcing a call to the master (meaning someone
+      // else beat us to it), then we just noop.
+      if (index == -1 || leaderIndex == NO_LEADER_INDEX) {
+        LOG.debug("{} couldn't be demoted as the leader for {}",
+            ts.getUuid(), getTabletIdAsString());
+        return;
+      }
+
+      if (leaderIndex == index) {
+        leaderIndex = NO_LEADER_INDEX;
+        LOG.debug("{} was demoted as the leader for {}", ts.getUuid(), getTabletIdAsString());
+      } else {
+        LOG.debug("{} wasn't the leader for {}, current leader is at index {}", ts.getUuid(),
+            getTabletIdAsString(), leaderIndex);
+      }
+    }
+  }
+
+  /**
+   * Get the connection to the tablet server that we think holds the leader replica for this tablet.
+   * @return a TabletClient that we think has the leader, else null
+   */
+  TabletClient getLeaderConnection() {
+    synchronized (tabletServers) {
+      if (tabletServers.isEmpty()) {
+        return null;
+      }
+      if (leaderIndex == RemoteTablet.NO_LEADER_INDEX) {
+        return null;
+      } else {
+        // and some reads.
+        return tabletServers.get(leaderIndex);
+      }
+    }
+  }
+
+  /**
+   * Gets the replicas of this tablet. The returned list may not be mutated.
+   * @return the replicas of the tablet
+   */
+  List<LocatedTablet.Replica> getReplicas() {
+    return replicas.get();
+  }
+
+  public String getTableId() {
+    return tableId;
+  }
+
+  Slice getTabletId() {
+    return tabletId;
+  }
+
+  public Partition getPartition() {
+    return partition;
+  }
+
+  byte[] getTabletIdAsBytes() {
+    return tabletId.getBytes();
+  }
+
+  String getTabletIdAsString() {
+    return tabletId.toString(Charset.defaultCharset());
+  }
+
+  @Override
+  public int compareTo(RemoteTablet remoteTablet) {
+    if (remoteTablet == null) {
+      return 1;
+    }
+
+    return ComparisonChain.start()
+        .compare(this.tableId, remoteTablet.tableId)
+        .compare(this.partition, remoteTablet.partition).result();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RemoteTablet that = (RemoteTablet) o;
+
+    return this.compareTo(that) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableId, partition);
+  }
+
+  static RemoteTablet createTabletFromPb(String tableId,
+                                         Master.TabletLocationsPB tabletPb,
+                                         ConnectionCache connectionCache)
+      throws NonRecoverableException {
+    Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
+    Slice tabletId = new Slice(tabletPb.getTabletId().toByteArray());
+    RemoteTablet tablet = new RemoteTablet(tableId, tabletId, partition, connectionCache);
+    tablet.init(tabletPb);
+    return tablet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index ec7fb48..18125b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedBytes;
 
 import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.client.AsyncKuduClient.RemoteTablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index d4d5d13..c91a48e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -750,7 +750,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    */
   private void failOrRetryRpc(final KuduRpc<?> rpc,
                               final RecoverableException exception) {
-    AsyncKuduClient.RemoteTablet tablet = rpc.getTablet();
+    RemoteTablet tablet = rpc.getTablet();
     // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
     // tablet it's because we didn't set it properly before calling sendRpc().
     if (tablet == null) {  // Can't retry, dunno where this RPC should go.

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d4d78b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 2c191ff..a044c5f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -18,7 +18,6 @@ package org.apache.kudu.client;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.WireProtocol.AppStatusPB;
-import org.apache.kudu.client.AsyncKuduClient.RemoteTablet;
 import org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
 
 import com.stumbleupon.async.Callback;
@@ -123,7 +122,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     RemoteTablet rt =
         client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
     String tabletId = rt.getTabletIdAsString();
-    TabletClient tc = client.clientFor(rt);
+    TabletClient tc = rt.getLeaderConnection();
     try {
       // Delete table so we get table not found error.
       client.deleteTable(TABLE_NAME).join();