You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:25 UTC

[20/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
deleted file mode 100644
index c1ccb3f..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ /dev/null
@@ -1,2437 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the above copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.kududb.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.kududb.Common;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.consensus.Metadata;
-import org.kududb.master.Master;
-import org.kududb.master.Master.GetTableLocationsResponsePB;
-import org.kududb.util.AsyncUtil;
-import org.kududb.util.NetUtil;
-import org.kududb.util.Pair;
-import org.kududb.util.Slice;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.kududb.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
-
-/**
- * A fully asynchronous and thread-safe client for Kudu.
- * <p>
- * This client should be
- * instantiated only once. You can use it with any number of tables at the
- * same time. The only case where you should have multiple instances is when
- * you want to use multiple different clusters at the same time.
- * <p>
- * If you play by the rules, this client is completely
- * thread-safe. Read the documentation carefully to know what the requirements
- * are for this guarantee to apply.
- * <p>
- * This client is fully non-blocking, any blocking operation will return a
- * {@link Deferred} instance to which you can attach a {@link Callback} chain
- * that will execute when the asynchronous operation completes.
- *
- * <h1>Note regarding {@code KuduRpc} instances passed to this class</h1>
- * Every {@link KuduRpc} passed to a method of this class should not be
- * changed or re-used until the {@code Deferred} returned by that method
- * calls you back.  <strong>Changing or re-using any {@link KuduRpc} for
- * an RPC in flight will lead to <em>unpredictable</em> results and voids
- * your warranty</strong>.
- *
- * <h1>{@code throws} clauses</h1>
- * None of the asynchronous methods in this API are expected to throw an
- * exception.  But the {@link Deferred} object they return to you can carry an
- * exception that you should handle (using "errbacks", see the javadoc of
- * {@link Deferred}).  In order to be able to do proper asynchronous error
- * handling, you need to know what types of exceptions you're expected to face
- * in your errbacks.  In order to document that, the methods of this API use
- * javadoc's {@code @throws} to spell out the exception types you should
- * handle in your errback.  Asynchronous exceptions will be indicated as such
- * in the javadoc with "(deferred)".
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class AsyncKuduClient implements AutoCloseable {
-
-  public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
-  public static final int SLEEP_TIME = 500;
-  public static final byte[] EMPTY_ARRAY = new byte[0];
-  public static final long NO_TIMESTAMP = -1;
-  public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
-  public static final long DEFAULT_SOCKET_READ_TIMEOUT_MS = 10000;
-  private static final long MAX_RPC_ATTEMPTS = 100;
-
-  private final ClientSocketChannelFactory channelFactory;
-
-  /**
-   * This map and the next 2 maps contain the same data, but indexed
-   * differently. There is no consistency guarantee across the maps.
-   * They are not updated all at the same time atomically.  This map
-   * is always the first to be updated, because that's the map from
-   * which all the lookups are done in the fast-path of the requests
-   * that need to locate a tablet. The second map to be updated is
-   * tablet2client, because it comes second in the fast-path
-   * of every requests that need to locate a tablet. The third map
-   * is only used to handle TabletServer disconnections gracefully.
-   *
-   * This map is keyed by table ID.
-   */
-  private final ConcurrentHashMap<String, ConcurrentSkipListMap<byte[],
-      RemoteTablet>> tabletsCache = new ConcurrentHashMap<>();
-
-  /**
-   * Maps a tablet ID to the RemoteTablet that knows where all the replicas are served.
-   */
-  private final ConcurrentHashMap<Slice, RemoteTablet> tablet2client = new ConcurrentHashMap<>();
-
-  /**
-   * Maps a client connected to a TabletServer to the list of tablets we know
-   * it's serving so far.
-   */
-  private final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Map of table ID to non-covered range cache.
-   *
-   * TODO: Currently once a non-covered range is added to the cache, it is never
-   * removed. Once adding range partitions becomes possible entries will need to
-   * be expired.
-   */
-  private final ConcurrentMap<String, NonCoveredRangeCache> nonCoveredRangeCaches =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Cache that maps a TabletServer address ("ip:port") to the clients
-   * connected to it.
-   * <p>
-   * Access to this map must be synchronized by locking its monitor.
-   * Lock ordering: when locking both this map and a TabletClient, the
-   * TabletClient must always be locked first to avoid deadlocks.  Logging
-   * the contents of this map (or calling toString) requires copying it first.
-   * <p>
-   * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
-   * (just when connecting to / disconnecting from TabletClients) and when we
-   * add something to it, we want to do an atomic get-and-put, but
-   * {@code putIfAbsent} isn't a good fit for us since it requires to create
-   * an object that may be "wasted" in case another thread wins the insertion
-   * race, and we don't want to create unnecessary connections.
-   * <p>
-   * Upon disconnection, clients are automatically removed from this map.
-   * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
-   * the clean-up on the {@code channelClosed} event, which is actually the
-   * 3rd and last event to be fired when a channel gets disconnected.  The
-   * first one to get fired is, {@code channelDisconnected}.  This matters to
-   * us because we want to purge disconnected clients from the cache as
-   * quickly as possible after the disconnection, to avoid handing out clients
-   * that are going to cause unnecessary errors.
-   * @see TabletClientPipeline#handleDisconnect
-   */
-  private final HashMap<String, TabletClient> ip2client =
-      new HashMap<String, TabletClient>();
-
-  @GuardedBy("sessions")
-  private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>();
-
-  // Since the masters also go through TabletClient, we need to treat them as if they were a normal
-  // table. We'll use the following fake table name to identify places where we need special
-  // handling.
-  static final String MASTER_TABLE_NAME_PLACEHOLDER =  "Kudu Master";
-  final KuduTable masterTable;
-  private final List<HostAndPort> masterAddresses;
-
-  private final HashedWheelTimer timer;
-
-  /**
-   * Timestamp required for HybridTime external consistency through timestamp
-   * propagation.
-   * @see src/kudu/common/common.proto
-   */
-  private long lastPropagatedTimestamp = NO_TIMESTAMP;
-
-  // A table is considered not served when we get an empty list of locations but know
-  // that a tablet exists. This is currently only used for new tables. The objects stored are
-  // table IDs.
-  private final Set<String> tablesNotServed = Collections.newSetFromMap(new
-      ConcurrentHashMap<String, Boolean>());
-
-  /**
-   * Semaphore used to rate-limit master lookups
-   * Once we have more than this number of concurrent master lookups, we'll
-   * start to throttle ourselves slightly.
-   * @see #acquireMasterLookupPermit
-   */
-  private final Semaphore masterLookups = new Semaphore(50);
-
-  private final Random sleepRandomizer = new Random();
-
-  private final long defaultOperationTimeoutMs;
-
-  private final long defaultAdminOperationTimeoutMs;
-
-  private final long defaultSocketReadTimeoutMs;
-
-  private final Statistics statistics;
-
-  private final boolean statisticsDisabled;
-
-  private final RequestTracker requestTracker;
-
-  private volatile boolean closed;
-
-  private AsyncKuduClient(AsyncKuduClientBuilder b) {
-    this.channelFactory = b.createChannelFactory();
-    this.masterAddresses = b.masterAddresses;
-    this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
-        MASTER_TABLE_NAME_PLACEHOLDER, null, null);
-    this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
-    this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
-    this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
-    this.statisticsDisabled = b.statisticsDisabled;
-    statistics = statisticsDisabled ? null : new Statistics();
-    this.timer = b.timer;
-    String clientId = UUID.randomUUID().toString().replace("-", "");
-    this.requestTracker = new RequestTracker(clientId);
-  }
-
-  /**
-   * Updates the last timestamp received from a server. Used for CLIENT_PROPAGATED
-   * external consistency. This is only publicly visible so that it can be set
-   * on tests, users should generally disregard this method.
-   *
-   * @param lastPropagatedTimestamp the last timestamp received from a server
-   */
-  @VisibleForTesting
-  public synchronized void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) {
-    if (this.lastPropagatedTimestamp == -1 ||
-      this.lastPropagatedTimestamp < lastPropagatedTimestamp) {
-      this.lastPropagatedTimestamp = lastPropagatedTimestamp;
-    }
-  }
-
-  @VisibleForTesting
-  public synchronized long getLastPropagatedTimestamp() {
-    return lastPropagatedTimestamp;
-  }
-
-  /**
-   * Returns a synchronous {@link KuduClient} which wraps this asynchronous client.
-   * Calling {@link KuduClient#close} on the returned client will close this client.
-   * If this asynchronous client should outlive the returned synchronous client,
-   * then do not close the synchronous client.
-   * @return a new synchronous {@code KuduClient}
-   */
-  public KuduClient syncClient() {
-    return new KuduClient(this);
-  }
-
-  /**
-   * Create a table on the cluster with the specified name, schema, and table configurations.
-   * @param name the table's name
-   * @param schema the table's schema
-   * @param builder a builder containing the table's configurations
-   * @return a deferred object to track the progress of the createTable command that gives
-   * an object to communicate with the created table
-   */
-  public Deferred<KuduTable> createTable(final String name, Schema schema,
-                                         CreateTableOptions builder) {
-    checkIsClosed();
-    if (builder == null) {
-      throw new IllegalArgumentException("CreateTableOptions may not be null");
-    }
-    if (!builder.getBuilder().getPartitionSchema().hasRangeSchema() &&
-        builder.getBuilder().getPartitionSchema().getHashBucketSchemasCount() == 0) {
-      throw new IllegalArgumentException("Table partitioning must be specified using " +
-                                         "setRangePartitionColumns or addHashPartitions");
-
-    }
-    CreateTableRequest create = new CreateTableRequest(this.masterTable, name, schema, builder);
-    create.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(create).addCallbackDeferring(
-        new Callback<Deferred<KuduTable>, CreateTableResponse>() {
-      @Override
-      public Deferred<KuduTable> call(CreateTableResponse createTableResponse) throws Exception {
-        return openTable(name);
-      }
-    });
-  }
-
-  /**
-   * Delete a table on the cluster with the specified name.
-   * @param name the table's name
-   * @return a deferred object to track the progress of the deleteTable command
-   */
-  public Deferred<DeleteTableResponse> deleteTable(String name) {
-    checkIsClosed();
-    DeleteTableRequest delete = new DeleteTableRequest(this.masterTable, name);
-    delete.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(delete);
-  }
-
-  /**
-   * Alter a table on the cluster as specified by the builder.
-   *
-   * When the returned deferred completes it only indicates that the master accepted the alter
-   * command, use {@link AsyncKuduClient#isAlterTableDone(String)} to know when the alter finishes.
-   * @param name the table's name, if this is a table rename then the old table name must be passed
-   * @param ato the alter table builder
-   * @return a deferred object to track the progress of the alter command
-   */
-  public Deferred<AlterTableResponse> alterTable(String name, AlterTableOptions ato) {
-    checkIsClosed();
-    AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato);
-    alter.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(alter);
-  }
-
-  /**
-   * Helper method that checks and waits until the completion of an alter command.
-   * It will block until the alter command is done or the deadline is reached.
-   * @param name the table's name, if the table was renamed then that name must be checked against
-   * @return a deferred object to track the progress of the isAlterTableDone command
-   */
-  public Deferred<IsAlterTableDoneResponse> isAlterTableDone(String name) {
-    checkIsClosed();
-    IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(this.masterTable, name);
-    request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(request);
-  }
-
-  /**
-   * Get the list of running tablet servers.
-   * @return a deferred object that yields a list of tablet servers
-   */
-  public Deferred<ListTabletServersResponse> listTabletServers() {
-    checkIsClosed();
-    ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(rpc);
-  }
-
-  Deferred<GetTableSchemaResponse> getTableSchema(String name) {
-    GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(rpc);
-  }
-
-  /**
-   * Get the list of all the tables.
-   * @return a deferred object that yields a list of all the tables
-   */
-  public Deferred<ListTablesResponse> getTablesList() {
-    return getTablesList(null);
-  }
-
-  /**
-   * Get a list of table names. Passing a null filter returns all the tables. When a filter is
-   * specified, it only returns tables that satisfy a substring match.
-   * @param nameFilter an optional table name filter
-   * @return a deferred that yields the list of table names
-   */
-  public Deferred<ListTablesResponse> getTablesList(String nameFilter) {
-    ListTablesRequest rpc = new ListTablesRequest(this.masterTable, nameFilter);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(rpc);
-  }
-
-  /**
-   * Test if a table exists.
-   * @param name a non-null table name
-   * @return true if the table exists, else false
-   */
-  public Deferred<Boolean> tableExists(final String name) {
-    if (name == null) {
-      throw new IllegalArgumentException("The table name cannot be null");
-    }
-    return getTablesList().addCallbackDeferring(new Callback<Deferred<Boolean>,
-        ListTablesResponse>() {
-      @Override
-      public Deferred<Boolean> call(ListTablesResponse listTablesResponse) throws Exception {
-        for (String tableName : listTablesResponse.getTablesList()) {
-          if (name.equals(tableName)) {
-            return Deferred.fromResult(true);
-          }
-        }
-        return Deferred.fromResult(false);
-      }
-    });
-  }
-
-  /**
-   * Open the table with the given name. If the table was just created, the Deferred will only get
-   * called back when all the tablets have been successfully created.
-   * @param name table to open
-   * @return a KuduTable if the table exists, else a MasterErrorException
-   */
-  public Deferred<KuduTable> openTable(final String name) {
-    checkIsClosed();
-
-    // We create an RPC that we're never going to send, and will instead use it to keep track of
-    // timeouts and use its Deferred.
-    final KuduRpc<KuduTable> fakeRpc = new KuduRpc<KuduTable>(null) {
-      @Override
-      ChannelBuffer serialize(Message header) { return null; }
-
-      @Override
-      String serviceName() { return null; }
-
-      @Override
-      String method() {
-        return "IsCreateTableDone";
-      }
-
-      @Override
-      Pair<KuduTable, Object> deserialize(CallResponse callResponse, String tsUUID)
-          throws Exception { return null; }
-    };
-    fakeRpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-
-    return getTableSchema(name).addCallbackDeferring(new Callback<Deferred<KuduTable>,
-        GetTableSchemaResponse>() {
-      @Override
-      public Deferred<KuduTable> call(GetTableSchemaResponse response) throws Exception {
-        KuduTable table = new KuduTable(AsyncKuduClient.this,
-            name,
-            response.getTableId(),
-            response.getSchema(),
-            response.getPartitionSchema());
-        // We grab the Deferred first because calling callback on the RPC will reset it and we'd
-        // return a different, non-triggered Deferred.
-        Deferred<KuduTable> d = fakeRpc.getDeferred();
-        if (response.isCreateTableDone()) {
-          LOG.debug("Opened table {}", name);
-          fakeRpc.callback(table);
-        } else {
-          LOG.debug("Delaying opening table {}, its tablets aren't fully created", name);
-          fakeRpc.attempt++;
-          delayedIsCreateTableDone(
-              table,
-              fakeRpc,
-              getOpenTableCB(fakeRpc, table),
-              getDelayedIsCreateTableDoneErrback(fakeRpc));
-        }
-        return d;
-      }
-    });
-  }
-
-  /**
-   * This callback will be repeatadly used when opening a table until it is done being created.
-   */
-  Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
-      final KuduRpc<KuduTable> rpc, final KuduTable table) {
-    return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>() {
-      @Override
-      public Deferred<KuduTable> call(
-          Master.IsCreateTableDoneResponsePB isCreateTableDoneResponsePB) throws Exception {
-        String tableName = table.getName();
-        Deferred<KuduTable> d = rpc.getDeferred();
-        if (isCreateTableDoneResponsePB.getDone()) {
-          LOG.debug("Table {}'s tablets are now created", tableName);
-          rpc.callback(table);
-        } else {
-          rpc.attempt++;
-          LOG.debug("Table {}'s tablets are still not created, further delaying opening it",
-              tableName);
-
-          delayedIsCreateTableDone(
-              table,
-              rpc,
-              getOpenTableCB(rpc, table),
-              getDelayedIsCreateTableDoneErrback(rpc));
-        }
-        return d;
-      }
-    };
-  }
-
-  /**
-   * Get the timeout used for operations on sessions and scanners.
-   * @return a timeout in milliseconds
-   */
-  public long getDefaultOperationTimeoutMs() {
-    return defaultOperationTimeoutMs;
-  }
-
-  /**
-   * Get the timeout used for admin operations.
-   * @return a timeout in milliseconds
-   */
-  public long getDefaultAdminOperationTimeoutMs() {
-    return defaultAdminOperationTimeoutMs;
-  }
-
-  /**
-   * Get the timeout used when waiting to read data from a socket. Will be triggered when nothing
-   * has been read on a socket connected to a tablet server for {@code timeout} milliseconds.
-   * @return a timeout in milliseconds
-   */
-  public long getDefaultSocketReadTimeoutMs() {
-    return defaultSocketReadTimeoutMs;
-  }
-
-  /**
-   * Check if statistics collection is enabled for this client.
-   * @return true if it is enabled, else false
-   */
-  public boolean isStatisticsEnabled() {
-    return !statisticsDisabled;
-  }
-
-  /**
-   * Get the statistics object of this client.
-   *
-   * @return this client's Statistics object
-   * @throws IllegalStateException thrown if statistics collection has been disabled
-   */
-  public Statistics getStatistics() {
-    if (statisticsDisabled) {
-      throw new IllegalStateException("This client's statistics is disabled");
-    }
-    return this.statistics;
-  }
-
-  RequestTracker getRequestTracker() {
-    return requestTracker;
-  }
-
-  /**
-   * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
-   * @param table the name of the table you intend to scan.
-   * The string is assumed to use the platform's default charset.
-   * @return a new scanner builder for this table
-   */
-  public AsyncKuduScanner.AsyncKuduScannerBuilder newScannerBuilder(KuduTable table) {
-    checkIsClosed();
-    return new AsyncKuduScanner.AsyncKuduScannerBuilder(this, table);
-  }
-
-  /**
-   * Create a new session for interacting with the cluster.
-   * User is responsible for destroying the session object.
-   * This is a fully local operation (no RPCs or blocking).
-   * @return a new AsyncKuduSession
-   */
-  public AsyncKuduSession newSession() {
-    checkIsClosed();
-    AsyncKuduSession session = new AsyncKuduSession(this);
-    synchronized (sessions) {
-      sessions.add(session);
-    }
-    return session;
-  }
-
-  /**
-   * This method is for KuduSessions so that they can remove themselves as part of closing down.
-   * @param session Session to remove
-   */
-  void removeSession(AsyncKuduSession session) {
-    synchronized (sessions) {
-      boolean removed = sessions.remove(session);
-      assert removed == true;
-    }
-  }
-
-  /**
-   * Package-private access point for {@link AsyncKuduScanner}s to scan more rows.
-   * @param scanner The scanner to use.
-   * @return A deferred row.
-   */
-  Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
-    final RemoteTablet tablet = scanner.currentTablet();
-    final TabletClient client = clientFor(tablet);
-    final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
-    final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
-    // Important to increment the attempts before the next if statement since
-    // getSleepTimeForRpc() relies on it if the client is null or dead.
-    next_request.attempt++;
-    if (client == null || !client.isAlive()) {
-      // A null client means we either don't know about this tablet anymore (unlikely) or we
-      // couldn't find a leader (which could be triggered by a read timeout).
-      // We'll first delay the RPC in case things take some time to settle down, then retry.
-      delayedSendRpcToTablet(next_request, null);
-      return next_request.getDeferred();
-    }
-    client.sendRpc(next_request);
-    return d;
-  }
-
-  /**
-   * Package-private access point for {@link AsyncKuduScanner}s to close themselves.
-   * @param scanner the scanner to close
-   * @return a deferred object that indicates the completion of the request.
-   * The {@link AsyncKuduScanner.Response} can contain rows that were left to scan.
-   */
-  Deferred<AsyncKuduScanner.Response> closeScanner(final AsyncKuduScanner scanner) {
-    final RemoteTablet tablet = scanner.currentTablet();
-    // Getting a null tablet here without being in a closed state means we were in between tablets.
-    if (tablet == null) {
-      return Deferred.fromResult(null);
-    }
-
-    final TabletClient client = clientFor(tablet);
-    if (client == null || !client.isAlive()) {
-      // Oops, we couldn't find a tablet server that hosts this tablet. Our
-      // cache was probably invalidated while the client was scanning. So
-      // we can't close this scanner properly.
-      LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
-      return Deferred.fromResult(null);
-    }
-    final KuduRpc<AsyncKuduScanner.Response>  close_request = scanner.getCloseRequest();
-    final Deferred<AsyncKuduScanner.Response> d = close_request.getDeferred();
-    close_request.attempt++;
-    client.sendRpc(close_request);
-    return d;
-  }
-
-  /**
-   * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
-   * of the tablet identified by the RPC's table and partition key.
-   *
-   * Note: despite the name, this method is also used for routing master
-   * requests to the leader master instance since it's also handled like a tablet.
-   *
-   * @param request the RPC to send
-   * @param <R> the expected return type of the RPC
-   * @return a {@code Deferred} which will contain the response
-   */
-  <R> Deferred<R> sendRpcToTablet(final KuduRpc<R> request) {
-    if (cannotRetryRequest(request)) {
-      return tooManyAttemptsOrTimeout(request, null);
-    }
-    request.attempt++;
-    final String tableId = request.getTable().getTableId();
-    byte[] partitionKey = request.partitionKey();
-    RemoteTablet tablet = getTablet(tableId, partitionKey);
-
-    if (tablet == null && partitionKey != null) {
-      // Check if the RPC is in a non-covered range.
-      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, partitionKey);
-      if (nonCoveredRange != null) {
-        return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
-                                                               nonCoveredRange.getValue()));
-      }
-      // Otherwise fall through to below where a GetTableLocations lookup will occur.
-    }
-
-    // Set the propagated timestamp so that the next time we send a message to
-    // the server the message includes the last propagated timestamp.
-    long lastPropagatedTs = getLastPropagatedTimestamp();
-    if (request.getExternalConsistencyMode() == CLIENT_PROPAGATED &&
-      lastPropagatedTs != NO_TIMESTAMP) {
-      request.setPropagatedTimestamp(lastPropagatedTs);
-    }
-
-    // If we found a tablet, we'll try to find the TS to talk to. If that TS was previously
-    // disconnected, say because we didn't query that tablet for some seconds, then we'll try to
-    // reconnect based on the old information. If that fails, we'll instead continue with the next
-    // block that queries the master.
-    if (tablet != null) {
-      TabletClient tabletClient = clientFor(tablet);
-      if (tabletClient != null) {
-        final Deferred<R> d = request.getDeferred();
-        if (tabletClient.isAlive()) {
-          request.setTablet(tablet);
-          tabletClient.sendRpc(request);
-          return d;
-        }
-        try {
-          tablet.reconnectTabletClient(tabletClient);
-        } catch (UnknownHostException e) {
-          LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
-              tabletClient.getUuid(), e);
-          // Because of this exception, clientFor() below won't be able to find a newTabletClient
-          // and we'll delay the RPC.
-        }
-        TabletClient newTabletClient = clientFor(tablet);
-        assert (tabletClient != newTabletClient);
-
-        if (newTabletClient == null) {
-          // Wait a little bit before hitting the master.
-          delayedSendRpcToTablet(request, null);
-          return request.getDeferred();
-        }
-
-        if (!newTabletClient.isAlive()) {
-          LOG.debug("Tried reconnecting to tablet server {} but failed, " +
-              "will query the master", tabletClient.getUuid());
-          // Let fall through.
-        } else {
-          request.setTablet(tablet);
-          newTabletClient.sendRpc(request);
-          return d;
-        }
-      }
-    }
-
-    // We fall through to here in two cases:
-    //
-    // 1) This client has not yet discovered the tablet which is responsible for
-    //    the RPC's table and partition key. This can happen when the client's
-    //    tablet location cache is cold because the client is new, or the table
-    //    is new.
-    //
-    // 2) The tablet is known, but we do not have an active client for the
-    //    leader replica.
-    if (tablesNotServed.contains(tableId)) {
-      return delayedIsCreateTableDone(request.getTable(), request,
-          new RetryRpcCB<R, Master.IsCreateTableDoneResponsePB>(request),
-          getDelayedIsCreateTableDoneErrback(request));
-    }
-    Callback<Deferred<R>, Master.GetTableLocationsResponsePB> cb = new RetryRpcCB<>(request);
-    Callback<Deferred<R>, Exception> eb = new RetryRpcErrback<>(request);
-    Deferred<Master.GetTableLocationsResponsePB> returnedD =
-        locateTablet(request.getTable(), partitionKey);
-    return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
-  }
-
-  /**
-   * Callback used to retry a RPC after another query finished, like looking up where that RPC
-   * should go.
-   * <p>
-   * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback and
-   * {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code Deferred}
-   * returned by {@link #locateTablet(KuduTable, byte[])}.
-   * @param <R> RPC's return type.
-   * @param <D> Previous query's return type, which we don't use, but need to specify in order to
-   *           tie it all together.
-   */
-  final class RetryRpcCB<R, D> implements Callback<Deferred<R>, D> {
-    private final KuduRpc<R> request;
-    RetryRpcCB(KuduRpc<R> request) {
-      this.request = request;
-    }
-    public Deferred<R> call(final D arg) {
-      LOG.debug("Retrying sending RPC {} after lookup", request);
-      return sendRpcToTablet(request);  // Retry the RPC.
-    }
-    public String toString() {
-      return "retry RPC";
-    }
-  }
-
-  /**
-   * "Errback" used to delayed-retry a RPC if it fails due to no leader master being found.
-   * Other exceptions are used to notify request RPC error, and passed through to be handled
-   * by the caller.
-   * <p>
-   * Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback" and
-   * {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
-   * {@link #locateTablet(KuduTable, byte[])}.
-   * @see #delayedSendRpcToTablet(KuduRpc, KuduException)
-   * @param <R> The type of the original RPC.
-   */
-  final class RetryRpcErrback<R> implements Callback<Deferred<R>, Exception> {
-    private final KuduRpc<R> request;
-
-    public RetryRpcErrback(KuduRpc<R> request) {
-      this.request = request;
-    }
-
-    @Override
-    public Deferred<R> call(Exception arg) {
-      if (arg instanceof NoLeaderMasterFoundException) {
-        // If we could not find the leader master, try looking up the leader master
-        // again.
-        // TODO: Handle the situation when multiple in-flight RPCs are queued waiting
-        // for the leader master to be determine (either after a failure or at initialization
-        // time). This could re-use some of the existing piping in place for non-master tablets.
-        Deferred<R> d = request.getDeferred();
-        delayedSendRpcToTablet(request, (NoLeaderMasterFoundException) arg);
-        return d;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Notify RPC %s after lookup exception", request), arg);
-      }
-      request.errback(arg);
-      return Deferred.fromError(arg);
-    }
-
-    @Override
-    public String toString() {
-      return "retry RPC after error";
-    }
-  }
-
-  /**
-   * This errback ensures that if the delayed call to IsCreateTableDone throws an Exception that
-   * it will be propagated back to the user.
-   * @param request Request to errback if there's a problem with the delayed call.
-   * @param <R> Request's return type.
-   * @return An errback.
-   */
-  <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
-    return new Callback<Exception, Exception>() {
-      @Override
-      public Exception call(Exception e) throws Exception {
-        // TODO maybe we can retry it?
-        request.errback(e);
-        return e;
-      }
-    };
-  }
-
-  /**
-   * This method will call IsCreateTableDone on the master after sleeping for
-   * getSleepTimeForRpc() based on the provided KuduRpc's number of attempts. Once this is done,
-   * the provided callback will be called.
-   * @param table the table to lookup
-   * @param rpc the original KuduRpc that needs to access the table
-   * @param retryCB the callback to call on completion
-   * @param errback the errback to call if something goes wrong when calling IsCreateTableDone
-   * @return Deferred used to track the provided KuduRpc
-   */
-  <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc,
-                                           final Callback<Deferred<R>,
-                                               Master.IsCreateTableDoneResponsePB> retryCB,
-                                           final Callback<Exception, Exception> errback) {
-
-    final class RetryTimer implements TimerTask {
-      public void run(final Timeout timeout) {
-        String tableId = table.getTableId();
-        final boolean has_permit = acquireMasterLookupPermit();
-        if (!has_permit) {
-          // If we failed to acquire a permit, it's worth checking if someone
-          // looked up the tablet we're interested in.  Every once in a while
-          // this will save us a Master lookup.
-          if (!tablesNotServed.contains(tableId)) {
-            try {
-              retryCB.call(null);
-              return;
-            } catch (Exception e) {
-              // we're calling RetryRpcCB which doesn't throw exceptions, ignore
-            }
-          }
-        }
-        IsCreateTableDoneRequest rpc = new IsCreateTableDoneRequest(masterTable, tableId);
-        rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-        final Deferred<Master.IsCreateTableDoneResponsePB> d =
-            sendRpcToTablet(rpc).addCallback(new IsCreateTableDoneCB(tableId));
-        if (has_permit) {
-          // The errback is needed here to release the lookup permit
-          d.addCallbacks(new ReleaseMasterLookupPermit<Master.IsCreateTableDoneResponsePB>(),
-              new ReleaseMasterLookupPermit<Exception>());
-        }
-        d.addCallbacks(retryCB, errback);
-      }
-    }
-    long sleepTime = getSleepTimeForRpc(rpc);
-    if (rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
-      return tooManyAttemptsOrTimeout(rpc, null);
-    }
-
-    newTimeout(new RetryTimer(), sleepTime);
-    return rpc.getDeferred();
-  }
-
-  private final class ReleaseMasterLookupPermit<T> implements Callback<T, T> {
-    public T call(final T arg) {
-      releaseMasterLookupPermit();
-      return arg;
-    }
-    public String toString() {
-      return "release master lookup permit";
-    }
-  }
-
-  /** Callback executed when IsCreateTableDone completes.  */
-  private final class IsCreateTableDoneCB implements Callback<Master.IsCreateTableDoneResponsePB,
-      Master.IsCreateTableDoneResponsePB> {
-    final String tableName;
-    IsCreateTableDoneCB(String tableName) {
-      this.tableName = tableName;
-    }
-    public Master.IsCreateTableDoneResponsePB call(final Master.IsCreateTableDoneResponsePB response) {
-      if (response.getDone()) {
-        LOG.debug("Table {} was created", tableName);
-        tablesNotServed.remove(tableName);
-      } else {
-        LOG.debug("Table {} is still being created", tableName);
-      }
-      return response;
-    }
-    public String toString() {
-      return "ask the master if " + tableName + " was created";
-    }
-  }
-
-  boolean isTableNotServed(String tableId) {
-    return tablesNotServed.contains(tableId);
-  }
-
-
-  long getSleepTimeForRpc(KuduRpc<?> rpc) {
-    byte attemptCount = rpc.attempt;
-    assert (attemptCount > 0);
-    if (attemptCount == 0) {
-      LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc,
-          new Exception("Exception created to collect stack trace"));
-      attemptCount = 1;
-    }
-    // Randomized exponential backoff, truncated at 4096ms.
-    long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12))
-        * sleepRandomizer.nextDouble());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
-    }
-    return sleepTime;
-  }
-
-  /**
-   * Modifying the list returned by this method won't change how AsyncKuduClient behaves,
-   * but calling certain methods on the returned TabletClients can. For example,
-   * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
-   * TabletClient#shutdown()}.
-   * @return Copy of the current TabletClients list
-   */
-  @VisibleForTesting
-  List<TabletClient> getTabletClients() {
-    synchronized (ip2client) {
-      return new ArrayList<TabletClient>(ip2client.values());
-    }
-  }
-
-  /**
-   * This method first clears tabletsCache and then tablet2client without any regards for
-   * calls to {@link #discoverTablets}. Call only when AsyncKuduClient is in a steady state.
-   * @param tableId table for which we remove all the RemoteTablet entries
-   */
-  @VisibleForTesting
-  void emptyTabletsCacheForTable(String tableId) {
-    tabletsCache.remove(tableId);
-    Set<Map.Entry<Slice, RemoteTablet>> tablets = tablet2client.entrySet();
-    for (Map.Entry<Slice, RemoteTablet> entry : tablets) {
-      if (entry.getValue().getTableId().equals(tableId)) {
-        tablets.remove(entry);
-      }
-    }
-  }
-
-  TabletClient clientFor(RemoteTablet tablet) {
-    if (tablet == null) {
-      return null;
-    }
-
-    synchronized (tablet.tabletServers) {
-      if (tablet.tabletServers.isEmpty()) {
-        return null;
-      }
-      if (tablet.leaderIndex == RemoteTablet.NO_LEADER_INDEX) {
-        // TODO we don't know where the leader is, either because one wasn't provided or because
-        // we couldn't resolve its IP. We'll just send the client back so it retries and probably
-        // dies after too many attempts.
-        return null;
-      } else {
-        // TODO we currently always hit the leader, we probably don't need to except for writes
-        // and some reads.
-        return tablet.tabletServers.get(tablet.leaderIndex);
-      }
-    }
-  }
-
-  /**
-   * Checks whether or not an RPC can be retried once more
-   * @param rpc The RPC we're going to attempt to execute
-   * @return {@code true} if this RPC already had too many attempts,
-   * {@code false} otherwise (in which case it's OK to retry once more)
-   */
-  static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
-    return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
-  }
-
-  /**
-   * Returns a {@link Deferred} containing an exception when an RPC couldn't
-   * succeed after too many attempts or if it already timed out.
-   * @param request The RPC that was retried too many times or timed out.
-   * @param cause What was cause of the last failed attempt, if known.
-   * You can pass {@code null} if the cause is unknown.
-   */
-  static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
-                                                  final KuduException cause) {
-    String message;
-    if (request.attempt > MAX_RPC_ATTEMPTS) {
-      message = "Too many attempts: ";
-    } else {
-      message = "RPC can not complete before timeout: ";
-    }
-    Status statusTimedOut = Status.TimedOut(message + request);
-    final Exception e = new NonRecoverableException(statusTimedOut, cause);
-    request.errback(e);
-    LOG.debug("Cannot continue with this RPC: {} because of: {}", request, message, e);
-    return Deferred.fromError(e);
-  }
-
-  /**
-   * Sends a getTableLocations RPC to the master to find the table's tablets.
-   * @param table table to lookup
-   * @param partitionKey can be null, if not we'll find the exact tablet that contains it
-   * @return Deferred to track the progress
-   */
-  private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable table,
-                                                                    byte[] partitionKey) {
-    final boolean has_permit = acquireMasterLookupPermit();
-    String tableId = table.getTableId();
-    if (!has_permit) {
-      // If we failed to acquire a permit, it's worth checking if someone
-      // looked up the tablet we're interested in.  Every once in a while
-      // this will save us a Master lookup.
-      RemoteTablet tablet = getTablet(tableId, partitionKey);
-      if (tablet != null && clientFor(tablet) != null) {
-        return Deferred.fromResult(null);  // Looks like no lookup needed.
-      }
-    }
-    // Leave the end of the partition key range empty in order to pre-fetch tablet locations.
-    GetTableLocationsRequest rpc =
-        new GetTableLocationsRequest(masterTable, partitionKey, null, tableId);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    final Deferred<Master.GetTableLocationsResponsePB> d;
-
-    // If we know this is going to the master, check the master consensus
-    // configuration (as specified by 'masterAddresses' field) to determine and
-    // cache the current leader.
-    if (isMasterTable(tableId)) {
-      d = getMasterTableLocationsPB();
-    } else {
-      d = sendRpcToTablet(rpc);
-    }
-    d.addCallback(new MasterLookupCB(table, partitionKey));
-    if (has_permit) {
-      d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
-    }
-    return d;
-  }
-
-  /**
-   * Update the master config: send RPCs to all config members, use the returned data to
-   * fill a {@link Master.GetTabletLocationsResponsePB} object.
-   * @return An initialized Deferred object to hold the response.
-   */
-  Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB() {
-    final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
-    final GetMasterRegistrationReceived received =
-        new GetMasterRegistrationReceived(masterAddresses, responseD);
-    for (HostAndPort hostAndPort : masterAddresses) {
-      Deferred<GetMasterRegistrationResponse> d;
-      // Note: we need to create a client for that host first, as there's a
-      // chicken and egg problem: since there is no source of truth beyond
-      // the master, the only way to get information about a master host is
-      // by making an RPC to that host.
-      TabletClient clientForHostAndPort = newMasterClient(hostAndPort);
-      if (clientForHostAndPort == null) {
-        String message = "Couldn't resolve this master's address " + hostAndPort.toString();
-        LOG.warn(message);
-        Status statusIOE = Status.IOError(message);
-        d = Deferred.fromError(new NonRecoverableException(statusIOE));
-      } else {
-        d = getMasterRegistration(clientForHostAndPort);
-      }
-      d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
-    }
-    return responseD;
-  }
-
-
-  /**
-   * Get all or some tablets for a given table. This may query the master multiple times if there
-   * are a lot of tablets.
-   * This method blocks until it gets all the tablets.
-   * @param table the table to locate tablets from
-   * @param startPartitionKey where to start in the table, pass null to start at the beginning
-   * @param endPartitionKey where to stop in the table, pass null to get all the tablets until the
-   *                        end of the table
-   * @param deadline deadline in milliseconds for this method to finish
-   * @return a list of the tablets in the table, which can be queried for metadata about
-   *         each tablet
-   * @throws Exception MasterErrorException if the table doesn't exist
-   */
-  List<LocatedTablet> syncLocateTable(KuduTable table,
-                                      byte[] startPartitionKey,
-                                      byte[] endPartitionKey,
-                                      long deadline) throws Exception {
-    return locateTable(table, startPartitionKey, endPartitionKey, deadline).join();
-  }
-
-  private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table,
-                                                        final byte[] startPartitionKey,
-                                                        final byte[] endPartitionKey,
-                                                        final List<LocatedTablet> ret,
-                                                        final DeadlineTracker deadlineTracker) {
-    // We rely on the keys initially not being empty.
-    Preconditions.checkArgument(startPartitionKey == null || startPartitionKey.length > 0,
-                                "use null for unbounded start partition key");
-    Preconditions.checkArgument(endPartitionKey == null || endPartitionKey.length > 0,
-                                "use null for unbounded end partition key");
-
-    // The next partition key to look up. If null, then it represents
-    // the minimum partition key, If empty, it represents the maximum key.
-    byte[] partitionKey = startPartitionKey;
-    String tableId = table.getTableId();
-
-    // Continue while the partition key is the minimum, or it is not the maximum
-    // and it is less than the end partition key.
-    while (partitionKey == null ||
-           (partitionKey.length > 0 &&
-            (endPartitionKey == null || Bytes.memcmp(partitionKey, endPartitionKey) < 0))) {
-      byte[] key = partitionKey == null ? EMPTY_ARRAY : partitionKey;
-      RemoteTablet tablet = getTablet(tableId, key);
-      if (tablet != null) {
-        ret.add(new LocatedTablet(tablet));
-        partitionKey = tablet.getPartition().getPartitionKeyEnd();
-        continue;
-      }
-
-      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, key);
-      if (nonCoveredRange != null) {
-        partitionKey = nonCoveredRange.getValue();
-        continue;
-      }
-
-      if (deadlineTracker.timedOut()) {
-        Status statusTimedOut = Status.TimedOut("Took too long getting the list of tablets, " +
-            deadlineTracker);
-        return Deferred.fromError(new NonRecoverableException(statusTimedOut));
-      }
-
-      // If the partition key location isn't cached, and the request hasn't timed out,
-      // then kick off a new tablet location lookup and try again when it completes.
-      // When lookup completes, the tablet (or non-covered range) for the next
-      // partition key will be located and added to the client's cache.
-      final byte[] lookupKey = partitionKey;
-      return locateTablet(table, key).addCallbackDeferring(
-          new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
-            @Override
-            public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
-              return loopLocateTable(table, lookupKey, endPartitionKey, ret, deadlineTracker);
-            }
-            @Override
-            public String toString() {
-              return "LoopLocateTableCB";
-            }
-          });
-    }
-
-    return Deferred.fromResult(ret);
-  }
-
-  /**
-   * Get all or some tablets for a given table. This may query the master multiple times if there
-   * are a lot of tablets.
-   * @param table the table to locate tablets from
-   * @param startPartitionKey where to start in the table, pass null to start at the beginning
-   * @param endPartitionKey where to stop in the table, pass null to get all the tablets until the
-   *                        end of the table
-   * @param deadline max time spent in milliseconds for the deferred result of this method to
-   *         get called back, if deadline is reached, the deferred result will get erred back
-   * @return a deferred object that yields a list of the tablets in the table, which can be queried
-   *         for metadata about each tablet
-   * @throws Exception MasterErrorException if the table doesn't exist
-   */
-  Deferred<List<LocatedTablet>> locateTable(final KuduTable table,
-                                            final byte[] startPartitionKey,
-                                            final byte[] endPartitionKey,
-                                            long deadline) {
-    final List<LocatedTablet> ret = Lists.newArrayList();
-    final DeadlineTracker deadlineTracker = new DeadlineTracker();
-    deadlineTracker.setDeadline(deadline);
-    return loopLocateTable(table, startPartitionKey, endPartitionKey, ret, deadlineTracker);
-  }
-
-  /**
-   * We're handling a tablet server that's telling us it doesn't have the tablet we're asking for.
-   * We're in the context of decode() meaning we need to either callback or retry later.
-   */
-  <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    invalidateTabletCache(rpc.getTablet(), server);
-    handleRetryableError(rpc, ex);
-  }
-
-  /**
-   * A tablet server is letting us know that it isn't the specified tablet's leader in response
-   * a RPC, so we need to demote it and retry.
-   */
-  <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
-    rpc.getTablet().demoteLeader(server);
-    handleRetryableError(rpc, ex);
-  }
-
-  <R> void handleRetryableError(final KuduRpc<R> rpc, KuduException ex) {
-    // TODO we don't always need to sleep, maybe another replica can serve this RPC.
-    delayedSendRpcToTablet(rpc, ex);
-  }
-
-  private <R> void delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
-    // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
-    // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
-    // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
-    // we're not expecting this in Kudu.
-    final class RetryTimer implements TimerTask {
-      public void run(final Timeout timeout) {
-        sendRpcToTablet(rpc);
-      }
-    }
-    long sleepTime = getSleepTimeForRpc(rpc);
-    if (cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
-      tooManyAttemptsOrTimeout(rpc, ex);
-      // Don't let it retry.
-      return;
-    }
-    newTimeout(new RetryTimer(), sleepTime);
-  }
-
-  /**
-   * Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing
-   * the tablet itself from the caches.
-   */
-  private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
-    LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
-        tablet.getTabletIdAsString());
-    tablet.removeTabletClient(server);
-  }
-
-  /** Callback executed when a master lookup completes.  */
-  private final class MasterLookupCB implements Callback<Object,
-      Master.GetTableLocationsResponsePB> {
-    final KuduTable table;
-    private final byte[] partitionKey;
-    MasterLookupCB(KuduTable table, byte[] partitionKey) {
-      this.table = table;
-      this.partitionKey = partitionKey;
-    }
-    public Object call(final GetTableLocationsResponsePB response) {
-      if (response.hasError()) {
-        if (response.getError().getCode() == Master.MasterErrorPB.Code.TABLET_NOT_RUNNING) {
-          // Keep a note that the table exists but at least one tablet is not yet running.
-          LOG.debug("Table {} has a non-running tablet", table.getName());
-          tablesNotServed.add(table.getTableId());
-        } else {
-          Status status = Status.fromMasterErrorPB(response.getError());
-          return new NonRecoverableException(status);
-        }
-      } else {
-        try {
-          discoverTablets(table, response.getTabletLocationsList());
-        } catch (NonRecoverableException e) {
-          return e;
-        }
-        if (partitionKey != null) {
-          discoverNonCoveredRangePartitions(table.getTableId(), partitionKey,
-                                            response.getTabletLocationsList());
-        }
-      }
-      return null;
-    }
-    public String toString() {
-      return "get tablet locations from the master for table " + table.getName();
-    }
-  }
-
-  boolean acquireMasterLookupPermit() {
-    try {
-      // With such a low timeout, the JVM may chose to spin-wait instead of
-      // de-scheduling the thread (and causing context switches and whatnot).
-      return masterLookups.tryAcquire(5, MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();  // Make this someone else's problem.
-      return false;
-    }
-  }
-
-  /**
-   * Releases a master lookup permit that was acquired.
-   * @see #acquireMasterLookupPermit
-   */
-  void releaseMasterLookupPermit() {
-    masterLookups.release();
-  }
-
-  @VisibleForTesting
-  void discoverTablets(KuduTable table, List<Master.TabletLocationsPB> locations)
-      throws NonRecoverableException {
-    String tableId = table.getTableId();
-    String tableName = table.getName();
-
-    // Doing a get first instead of putIfAbsent to avoid creating unnecessary CSLMs because in
-    // the most common case the table should already be present
-    ConcurrentSkipListMap<byte[], RemoteTablet> tablets = tabletsCache.get(tableId);
-    if (tablets == null) {
-      tablets = new ConcurrentSkipListMap<>(Bytes.MEMCMP);
-      ConcurrentSkipListMap<byte[], RemoteTablet> oldTablets =
-          tabletsCache.putIfAbsent(tableId, tablets);
-      if (oldTablets != null) {
-        tablets = oldTablets;
-      }
-    }
-
-    for (Master.TabletLocationsPB tabletPb : locations) {
-      // Early creating the tablet so that it parses out the pb
-      RemoteTablet rt = createTabletFromPb(tableId, tabletPb);
-      Slice tabletId = rt.tabletId;
-
-      // If we already know about this one, just refresh the locations
-      RemoteTablet currentTablet = tablet2client.get(tabletId);
-      if (currentTablet != null) {
-        currentTablet.refreshTabletClients(tabletPb);
-        continue;
-      }
-
-      // Putting it here first doesn't make it visible because tabletsCache is always looked up
-      // first.
-      RemoteTablet oldRt = tablet2client.putIfAbsent(tabletId, rt);
-      if (oldRt != null) {
-        // someone beat us to it
-        continue;
-      }
-      LOG.info("Discovered tablet {} for table '{}' with partition {}",
-               tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
-      rt.refreshTabletClients(tabletPb);
-      // This is making this tablet available
-      // Even if two clients were racing in this method they are putting the same RemoteTablet
-      // with the same start key in the CSLM in the end
-      tablets.put(rt.getPartition().getPartitionKeyStart(), rt);
-    }
-  }
-
-  private void discoverNonCoveredRangePartitions(String tableId,
-                                                 byte[] partitionKey,
-                                                 List<Master.TabletLocationsPB> locations) {
-    NonCoveredRangeCache nonCoveredRanges = nonCoveredRangeCaches.get(tableId);
-    if (nonCoveredRanges == null) {
-      nonCoveredRanges = new NonCoveredRangeCache();
-      NonCoveredRangeCache oldCache = nonCoveredRangeCaches.putIfAbsent(tableId, nonCoveredRanges);
-      if (oldCache != null) {
-        nonCoveredRanges = oldCache;
-      }
-    }
-
-    // If there are no locations, then the table has no tablets. This is
-    // guaranteed because we never set an upper bound on the GetTableLocations
-    // request, and the master will always return the tablet *before* the start
-    // of the request, if the start key falls in a non-covered range (see the
-    // comment on GetTableLocationsResponsePB in master.proto).
-    if (locations.isEmpty()) {
-      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, EMPTY_ARRAY);
-      return;
-    }
-
-    // If the first tablet occurs after the requested partition key,
-    // then there is an initial non-covered range.
-    byte[] firstStartKey = locations.get(0).getPartition().getPartitionKeyStart().toByteArray();
-    if (Bytes.memcmp(partitionKey, firstStartKey) < 0) {
-      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, firstStartKey);
-    }
-
-    byte[] previousEndKey = null;
-    for (Master.TabletLocationsPB location : locations) {
-      byte[] startKey = location.getPartition().getPartitionKeyStart().toByteArray();
-
-      // Check if there is a non-covered range between this tablet and the previous.
-      if (previousEndKey != null && Bytes.memcmp(previousEndKey, startKey) < 0) {
-        nonCoveredRanges.addNonCoveredRange(previousEndKey, startKey);
-      }
-      previousEndKey = location.getPartition().getPartitionKeyEnd().toByteArray();
-    }
-
-    if (previousEndKey.length > 0 && Bytes.memcmp(previousEndKey, partitionKey) <= 0) {
-      // This happens if the partition key falls in a non-covered range that
-      // is unbounded (to the right).
-      nonCoveredRanges.addNonCoveredRange(previousEndKey, EMPTY_ARRAY);
-    }
-  }
-
-  RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
-    Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition());
-    Slice tabletId = new Slice(tabletPb.getTabletId().toByteArray());
-    return new RemoteTablet(tableId, tabletId, partition);
-  }
-
-  /**
-   * Gives the tablet's ID for the table ID and partition key.
-   * In the future there will be multiple tablets and this method will find the right one.
-   * @param tableId table to find the tablet for
-   * @return a tablet ID as a slice or null if not found
-   */
-  RemoteTablet getTablet(String tableId, byte[] partitionKey) {
-    ConcurrentSkipListMap<byte[], RemoteTablet> tablets = tabletsCache.get(tableId);
-
-    if (tablets == null) {
-      return null;
-    }
-
-    // We currently only have one master tablet.
-    if (isMasterTable(tableId)) {
-      if (tablets.firstEntry() == null) {
-        return null;
-      }
-      return tablets.firstEntry().getValue();
-    }
-
-    Map.Entry<byte[], RemoteTablet> tabletPair = tablets.floorEntry(partitionKey);
-
-    if (tabletPair == null) {
-      return null;
-    }
-
-    Partition partition = tabletPair.getValue().getPartition();
-
-    // If the partition is not the end partition, but it doesn't include the key
-    // we are looking for, then we have not yet found the correct tablet.
-    if (!partition.isEndPartition()
-        && Bytes.memcmp(partitionKey, partition.getPartitionKeyEnd()) >= 0) {
-      return null;
-    }
-
-    return tabletPair.getValue();
-  }
-
-  /**
-   * Returns a deferred containing the located tablet which covers the partition key in the table.
-   * @param table the table
-   * @param partitionKey the partition key of the tablet to look up in the table
-   * @param deadline deadline in milliseconds for this lookup to finish
-   * @return a deferred containing the located tablet
-   */
-  Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
-                                            final byte[] partitionKey,
-                                            long deadline) {
-    // Locate the tablets at the partition key by locating all tablets between
-    // the partition key (inclusive), and the incremented partition key (exclusive).
-
-    Deferred<List<LocatedTablet>> locatedTablets;
-    if (partitionKey.length == 0) {
-      locatedTablets = locateTable(table, null, new byte[] { 0x00 }, deadline);
-    } else {
-      locatedTablets = locateTable(table, partitionKey,
-                                   Arrays.copyOf(partitionKey, partitionKey.length + 1), deadline);
-    }
-
-    // Then pick out the single tablet result from the list.
-    return locatedTablets.addCallbackDeferring(
-        new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>() {
-          @Override
-          public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
-            Preconditions.checkArgument(tablets.size() <= 1,
-                                        "found more than one tablet for a single partition key");
-            if (tablets.size() == 0) {
-              Map.Entry<byte[], byte[]> nonCoveredRange =
-                  nonCoveredRangeCaches.get(table.getTableId()).getNonCoveredRange(partitionKey);
-              return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
-                                                                     nonCoveredRange.getValue()));
-            }
-            return Deferred.fromResult(tablets.get(0));
-          }
-        });
-  }
-
-  /**
-   * Returns the non-covered range partition containing the {@code partitionKey} in
-   * the table, or null if there is no known non-covering range for the partition key.
-   * @param tableId of the table
-   * @param partitionKey to lookup
-   * @return the non-covering partition range, or {@code null}
-   */
-   Map.Entry<byte[], byte[]> getNonCoveredRange(String tableId, byte[] partitionKey) {
-     if (isMasterTable(tableId)) {
-       throw new IllegalArgumentException("No non-covering range partitions for the master");
-     }
-     NonCoveredRangeCache nonCoveredRangeCache = nonCoveredRangeCaches.get(tableId);
-     if (nonCoveredRangeCache == null) return null;
-
-     return nonCoveredRangeCache.getNonCoveredRange(partitionKey);
-   }
-
-  /**
-   * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
-   * for a replica.
-   * @param masterClient An initialized client for the master replica.
-   * @return A Deferred object for the master replica's current registration.
-   */
-  Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient masterClient) {
-    GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
-    rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
-    rpc.attempt++;
-    masterClient.sendRpc(rpc);
-    return d;
-  }
-
-  /**
-   * If a live client already exists for the specified master server, returns that client;
-   * otherwise, creates a new client for the specified master server.
-   * @param masterHostPort The RPC host and port for the master server.
-   * @return A live and initialized client for the specified master server.
-   */
-  TabletClient newMasterClient(HostAndPort masterHostPort) {
-    String ip = getIP(masterHostPort.getHostText());
-    if (ip == null) {
-      return null;
-    }
-    // We should pass a UUID here but we have a chicken and egg problem, we first need to
-    // communicate with the masters to find out about them, and that's what we're trying to do.
-    // The UUID is used for logging, so instead we're passing the "master table name" followed by
-    // host and port which is enough to identify the node we're connecting to.
-    return newClient(MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
-        ip, masterHostPort.getPort());
-  }
-
-  TabletClient newClient(String uuid, final String host, final int port) {
-    final String hostport = host + ':' + port;
-    TabletClient client;
-    SocketChannel chan;
-    synchronized (ip2client) {
-      client = ip2client.get(hostport);
-      if (client != null && client.isAlive()) {
-        return client;
-      }
-      final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(uuid, host, port);
-      chan = channelFactory.newChannel(pipeline);
-      ip2client.put(hostport, client);  // This is guaranteed to return null.
-
-      // The client2tables map is assumed to contain `client` after it is published in ip2client.
-      this.client2tablets.put(client, new ArrayList<RemoteTablet>());
-    }
-    final SocketChannelConfig config = chan.getConfig();
-    config.setConnectTimeoutMillis(5000);
-    config.setTcpNoDelay(true);
-    // Unfortunately there is no way to override the keep-alive timeout in
-    // Java since the JRE doesn't expose any way to call setsockopt() with
-    // TCP_KEEPIDLE.  And of course the default timeout is >2h. Sigh.
-    config.setKeepAlive(true);
-    chan.connect(new InetSocketAddress(host, port));  // Won't block.
-    return client;
-  }
-
-  /**
-   * Invokes {@link #shutdown()} and waits for the configured admin timeout. This method returns
-   * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
-   * @throws Exception if an error happens while closing the connections
-   */
-  @Override
-  public void close() throws Exception {
-    shutdown().join(defaultAdminOperationTimeoutMs);
-  }
-
-  /**
-   * Performs a graceful shutdown of this instance.
-   * <p>
-   * <ul>
-   *   <li>{@link AsyncKuduSession#flush Flushes} all buffered edits.</li>
-   *   <li>Cancels all the other requests.</li>
-   *   <li>Terminates all connections.</li>
-   *   <li>Releases all other resources.</li>
-   * </ul>
-   * <strong>Not calling this method before losing the last reference to this
-   * instance may result in data loss and other unwanted side effects</strong>
-   * @return A {@link Deferred}, whose callback chain will be invoked once all
-   * of the above have been done. If this callback chain doesn't fail, then
-   * the clean shutdown will be successful, and all the data will be safe on
-   * the Kudu side. In case of a failure (the "errback" is invoked) you will have
-   * to open a new AsyncKuduClient if you want to retry those operations.
-   * The Deferred doesn't actually hold any content.
-   */
-  public Deferred<ArrayList<Void>> shutdown() {
-    checkIsClosed();
-    closed = true;
-    // This is part of step 3.  We need to execute this in its own thread
-    // because Netty gets stuck in an infinite loop if you try to shut it
-    // down from within a thread of its own thread pool.  They don't want
-    // to fix this so as a workaround we always shut Netty's thread pool
-    // down from another thread.
-    final class ShutdownThread extends Thread {
-      ShutdownThread() {
-        super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
-      }
-      public void run() {
-        // This terminates the Executor.
-        channelFactory.releaseExternalResources();
-      }
-    }
-
-    // 3. Release all other resources.
-    final class ReleaseResourcesCB implements Callback<ArrayList<Void>, ArrayList<Void>> {
-      public ArrayList<Void> call(final ArrayList<Void> arg) {
-        LOG.debug("Releasing all remaining resources");
-        timer.stop();
-        new ShutdownThread().start();
-        return arg;
-      }
-      public String toString() {
-        return "release resources callback";
-      }
-    }
-
-    // 2. Terminate all connections.
-    final class DisconnectCB implements Callback<Deferred<ArrayList<Void>>,
-        ArrayList<List<OperationResponse>>> {
-      public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> ignoredResponses) {
-        return disconnectEverything().addCallback(new ReleaseResourcesCB());
-      }
-      public String toString() {
-        return "disconnect callback";
-      }
-    }
-
-    // 1. Flush everything.
-    // Notice that we do not handle the errback, if there's an exception it will come straight out.
-    return closeAllSessions().addCallbackDeferring(new DisconnectCB());
-  }
-
-  private void checkIsClosed() {
-    if (closed) {
-      throw new IllegalStateException("Cannot proceed, the client has already been closed");
-    }
-  }
-
-  private Deferred<ArrayList<List<OperationResponse>>> closeAllSessions() {
-    // We create a copy because AsyncKuduSession.close will call removeSession which would get us a
-    // concurrent modification during the iteration.
-    Set<AsyncKuduSession> copyOfSessions;
-    synchronized (sessions) {
-      copyOfSessions = new HashSet<AsyncKuduSession>(sessions);
-    }
-    if (sessions.isEmpty()) {
-      return Deferred.fromResult(null);
-    }
-    // Guaranteed that we'll have at least one session to close.
-    List<Deferred<List<OperationResponse>>> deferreds = new ArrayList<>(copyOfSessions.size());
-    for (AsyncKuduSession session : copyOfSessions ) {
-      deferreds.add(session.close());
-    }
-
-    return Deferred.group(deferreds);
-  }
-
-  /**
-   * Closes every socket, which will also cancel all the RPCs in flight.
-   */
-  private Deferred<ArrayList<Void>> disconnectEverything() {
-    ArrayList<Deferred<Void>> deferreds =
-        new ArrayList<Deferred<Void>>(2);
-    HashMap<String, TabletClient> ip2client_copy;
-    synchronized (ip2client) {
-      // Make a local copy so we can shutdown every Tablet Server clients
-      // without hold the lock while we iterate over the data structure.
-      ip2client_copy = new HashMap<String, TabletClient>(ip2client);
-    }
-
-    for (TabletClient ts : ip2client_copy.values()) {
-      deferreds.add(ts.shutdown());
-    }
-    final int size = deferreds.size();
-    return Deferred.group(deferreds).addCallback(
-        new Callback<ArrayList<Void>, ArrayList<Void>>() {
-          public ArrayList<Void> call(final ArrayList<Void> arg) {
-            // Normally, now that we've shutdown() every client, all our caches should
-            // be empty since each shutdown() generates a DISCONNECTED event, which
-            // causes TabletClientPipeline to call removeClientFromIpCache().
-            HashMap<String, TabletClient> logme = null;
-            synchronized (ip2client) {
-              if (!ip2client.isEmpty()) {
-                logme = new HashMap<String, TabletClient>(ip2client);
-              }
-            }
-            if (logme != null) {
-              // Putting this logging statement inside the synchronized block
-              // can lead to a deadlock, since HashMap.toString() is going to
-              // call TabletClient.toString() on each entry, and this locks the
-              // client briefly.  Other parts of the code lock clients first and
-              // the ip2client HashMap second, so this can easily deadlock.
-              LOG.error("Some clients are left in the client cache and haven't"
-                  + " been cleaned up: " + logme);
-            }
-            return arg;
-          }
-
-          public String toString() {
-            return "wait " + size + " TabletClient.shutdown()";
-          }
-        });
-  }
-
-  /**
-   * Blocking call.
-   * Performs a slow search of the IP used by the given client.
-   * <p>
-   * This is needed when we're trying to find the IP of the client before its
-   * channel has successfully connected, because Netty's API offers no way of
-   * retrieving the IP of the remote peer until we're connected to it.
-   * @param client The client we want the IP of.
-   * @return The IP of the client, or {@code null} if we couldn't find it.
-   */
-  private InetSocketAddress slowSearchClientIP(final TabletClient client) {
-    String hostport = null;
-    synchronized (ip2client) {
-      for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
-        if (e.getValue() == client) {
-          hostport = e.getKey();
-          break;
-        }
-      }
-    }
-
-    if (hostport == null) {
-      HashMap<String, TabletClient> copy;
-      synchronized (ip2client) {
-        copy = new HashMap<String, TabletClient>(ip2client);
-      }
-      LOG.error("WTF?  Should never happen!  Couldn't find " + client
-          + " in " + copy);
-      return null;
-    }
-    final int colon = hostport.indexOf(':', 1);
-    if (colon < 1) {
-      LOG.error("WTF?  Should never happen!  No `:' found in " + hostport);
-      return null;
-    }
-    final String host = getIP(hostport.substring(0, colon));
-    if (host == null) {
-      // getIP will print the reason why, there's nothing else we can do.
-      return null;
-    }
-
-    int port;
-    try {
-      port = parsePortNumber(hostport.substring(colon + 1,
-          hostport.length()));
-    } catch (NumberFormatException e) {
-      LOG.error("WTF?  Should never happen!  Bad port in " + hostport, e);
-      return null;
-    }
-    return new InetSocketAddress(host, port);
-  }
-
-  /**
-   * Removes the given client from the `ip2client` cache.
-   * @param client The client for which we must clear the ip cache
-   * @param remote The address of the remote peer, if known, or null
-   */
-  private void removeClientFromIpCache(final TabletClient client,
-                                       final SocketAddress remote) {
-
-    if (remote == null) {
-      return;  // Can't continue without knowing the remote address.
-    }
-
-    String hostport;
-    if (remote instanceof InetSocketAddress) {
-      final InetSocketAddress sock = (InetSocketAddress) remote;
-      final InetAddress addr = sock.getAddress();
-      if (addr == null) {
-        LOG.error("WTF?  Unresolved IP for " + remote
-            + ".  This shouldn't happen.");
-        return;
-      } else {
-        hostport = addr.getHostAddress() + ':' + sock.getPort();
-      }
-    } else {
-      LOG.error("WTF?  Found a non-InetSocketAddress remote: " + remote
-          + ".  This shouldn't happen.");
-      return;
-    }
-
-    TabletClient old;
-    synchronized (ip2client) {
-      old = ip2client.remove(hostport);
-    }
-    LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
-    if (old == null) {
-      // Currently we're seeing this message when masters are disconnected and the hostport we got
-      // above is different than the one the user passes (that we use to populate ip2client). At
-      // worst this doubles the entries for masters, which has an insignificant impact.
-      // TODO When fixed, make this a WARN again.
-      LOG.trace("When expiring " + client + " from the client cache (host:port="
-          + hostport + "), it was found that there was no entry"
-          + " corresponding to " + remote + ".  This shouldn't happen.");
-    }
-  }
-
-  /**
-   * Call this method after encountering an error connecting to a tablet server so that we stop
-   * considering it a leader for the tablets it serves.
-   * @param client tablet server to use for demotion
-   */
-  void demoteAsLeaderForAllTablets(final TabletClient client) {
-    ArrayList<RemoteTablet> tablets = client2tablets.get(client);
-    if (tablets != null) {
-      // Make a copy so we don't need to synchronize on it while iterating.
-      RemoteTablet[] tablets_copy;
-      synchronized (tablets) {
-        tablets_copy = tablets.toArray(new RemoteTablet[tablets.size()]);
-      }
-      for (final RemoteTablet remoteTablet : tablets_copy) {
-        // It will be a no-op if it's not already a leader.
-        remoteTablet.demoteLeader(client);
-      }
-    }
-  }
-
-  private boolean isMasterTable(String tableId) {
-    // Checking that it's the same instance so there's absolutely no chance of confusing the master
-    // 'table' for a user one.
-    return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
-  }
-
-  private final class TabletClientPipeline extends DefaultChannelPipeline {
-
-    private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class);
-    /**
-     * Have we already disconnected?.
-     * We use this to avoid doing the cleanup work for the same client more
-     * than once, even if we get multiple events indicating that the client
-     * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
-     * No synchronization needed as this is always accessed from only one
-     * thread at a time (equivalent to a non-shared state in a Netty handler).
-     */
-    private boolean disconnected = false;
-
-    TabletClient init(String uuid, String host, int port) {
-      final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, host, port);
-      if (defaultSocketReadTimeoutMs > 0) {
-        super.addLast("timeout-handler",
-            new ReadTimeoutHandler(timer,
-                defaultSocketReadTimeoutMs,
-                TimeUnit.MILLISECONDS));
-      }
-      super.addLast("kudu-handler", client);
-
-      return client;
-    }
-
-    @Override
-    public void sendDownstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendDownstream(event);
-    }
-
-    @Override
-    public void sendUpstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendUpstream(event);
-    }
-
-    private void handleDisconnect(final ChannelStateEvent state_event) {
-      if (disconnected) {
-        return;
-      }
-      switch (state_event.getState()) {
-        case OPEN:
-          if (state_event.getValue() == Boolean.FALSE) {
-            break;  // CLOSED
-          }
-          return;
-        case CONNECTED:
-          if (state_event.getValue() == null) {
-            break;  // DISCONNECTED
-          }
-          return;
-        default:
-          return;  // Not an event we're interested in, ignore it.
-      }
-
-      disconnected = true;  // So we don't clean up the same client twice.
-      try {
-        final TabletClient client = super.get(TabletClient.class);
-        SocketAddress remote = super.getChannel().getRemoteAddress();
-        // At this point Netty gives us no easy way to access the
-        // SocketAddress of the peer we tried to connect to. This
-        // kinda sucks but I couldn't find an easier way.
-        if (remote == null) {
-          remote = slowSearchClientIP(client);
-        }
-
-        synchronized (client) {
-          removeClientFromIpCache(client, remote);
-        }
-      } catch (Exception e) {
-        log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
-      }
-    }
-
-  }
-
-  /**
-   * Gets a hostname or an IP address and returns the textual representation
-   * of the IP address.
-   * <p>
-   * <strong>This method can block</strong> as there is no API for
-   * asynchronous DNS resolution in the JDK.
-   * @param host The hostname to resolve.
-   * @return The IP address associated with the given hostname,
-   * or {@code null} if the address couldn't be resolved.
-   */
-  private static String getIP(final String host) {
-    final long start = System.nanoTime();
-    try {
-      final String ip = InetAddress.getByName(host).getHostAddress();
-      final long latency = System.nanoTime() - start;
-      if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
-        LOG.debug("Resolved IP of `" + host + "' to "
-            + ip + " in " + latency + "ns");
-      } else if (latency >= 3000000/*ns*/) {
-        LOG.warn("Slow DNS lookup!  Resolved IP of `" + host + "' to "
-            + ip + " in " + latency + "ns");
-      }
-      return ip;
-    } catch (UnknownHostException e) {
-      LOG.error("Failed to resolve the IP of `" + host + "' in "
-          + (System.nanoTime() - start) + "ns");
-      return null;
-    }
-  }
-
-  /**
-   * Parses a TCP port number from a string.
-   * @param portnum The string to parse.
-   * @return A strictly positive, validated port number.
-   * @throws NumberFormatException if the string couldn't be parsed as an
-   * integer or if the value was outside of the range allowed for TCP ports.
-   */
-  private static int parsePortNumber(final String portnum)
-      throws NumberFormatException {
-    final int port = Integer.parseInt(portnum);
-    if (port <= 0 || port > 65535) {
-      throw new NumberFormatException(port == 0 ? "port is zero" :
-          (port < 0 ? "port is negative: "
-              : "port is too large: ") + port);
-    }
-    return port;
-  }
-
-  void newTimeout(final TimerTask task, final long timeout_ms) {
-    try {
-      timer.newTimeout(task, timeout_ms, MILLISECONDS);
-    } catch (IllegalStateException e) {
-      // This can happen if the timer fires just before shutdown()
-      // is called from another thread, and due to how threads get
-      // scheduled we tried to call newTimeout() after timer.stop().
-      LOG.warn("Failed to schedule timer."
-          + "  Ignore this if we're shutting down.", e);
-    }
-  }
-
-  /**
-   * This class encapsulates the information regarding a tablet and its locations.
-   *
-   * Leader failover mechanism:
-   * When we get a complete peer list from the master, we place the leader in the first
-   * position of the tabletServers array. When we detect that it isn't the leader anymore (in
-   * TabletClient), we demote it and set the next TS in the array as the leader. When the RPC
-   * gets retried, it will use that TS since we always pick the leader.
-   *
-   * If that TS turns out to not be the leader, we will demote it and promote the next one, retry.
-   * When we hit the end of the list, we set the leaderIndex to NO_LEADER_INDEX which forces us
-   * to fetch the tablet locations from the master. We'll repeat this whole process until a RPC
-   * succeeds.
-   *
-   * Subtleties:
-   * We don't keep track of a TS after it disconnects (via removeTabletClient), so if we
-   * haven't contacted one for 10 seconds (socket timeout), it will be removed from the list of
-   * tabletServers. This means that if the leader fails, we only have one other TS to "promote"
-   * or maybe none at all. This is partly why we then set leaderIndex to NO_LEADER_INDEX.
-   *
-   * The effect of treating a TS as the new leader means that the Scanner will also try to hit it
-   * with requests. It's currently unclear if that's a good or a bad thing.
-   *
-   * Unlike the C++ client, we don't short-circuit the call to the master if it isn't available.
-   * This means that after trying all the peers to find the leader, we might get stuck waiting on
-   * a reachable master.
-   */
-  public class RemoteTablet implements Comparable<RemoteTablet> {
-
-    private static final int NO_LEADER_INDEX = -1;
-    private final String tableId;
-    private final Slice tabletId;
-    @GuardedBy("tabletServers")
-    private final ArrayList<TabletClient> tabletServers = new ArrayList<>();
-    private final AtomicReference<List<LocatedTablet.Replica>> replicas =
-        new AtomicReference(ImmutableList.of());
-    private final Partition partition;
-    private int leaderIndex = NO_LEADER_INDEX;
-
-    RemoteTablet(String tableId, Slice tabletId, Partition partition) {
-      this.tabletId = tabletId;
-      this.tableId = tableId;
-      this.partition = partition;
-    }
-
-    void refreshTabletClients(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
-
-      synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
-        tabletServers.clear();
-        leaderIndex = NO_LEADER_INDEX;
-        List<UnknownHostException> lookupExceptions =
-            new ArrayList<>(tabletLocations.getReplicasCount());
-        for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
-
-          List<Common.HostPortPB> addresses = replica.getTsInfo().getRpcAddressesList();
-          if (addresses.isEmpty()) {
-            LOG.warn("Tablet server for tablet " + getTabletIdAsString() + " doesn't have any " +
-                "address");
-            continue;
-          }
-          byte[] buf = Bytes.get(replica.getTsInfo().getPermanentUuid());
-          String uuid = Bytes.getString(buf);
-          // from meta_cache.cc
-          // TODO: if the TS advertises multiple host/ports, pick the right one
-          // based on some kind of policy. For now just use the first always.
-          try {
-            addTabletClient(uuid, addresses.get(0).getHost(), addresses.get(0).getPort(),
-                replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER));
-          } catch (UnknownHostException ex) {
-            lookupExceptions.add(ex);
-          }
-        }
-
-        if (leaderIndex == NO_LEADER_INDEX) {
-          LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
-        }
-
-        // If we found a tablet that doesn't contain a single location that we can resolve, there's
-        // no point in retrying.
-        if (!lookupExceptions.isEmpty() &&
-            lookupExceptions.size() == tabletLocations.getReplicasCount()) {
-          Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " +
-              lookupExceptions);
-          throw new NonRecoverableException(statusIOE);
-        }
-
-      }
-
-      ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>();
-      for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) {
-        replicasBuilder.add(new LocatedTablet.Replica(replica));
-      }
-      replicas.set(replicasBuilder.build());
-    }
-
-    // Must be called with tabl

<TRUNCATED>