You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC

svn commit: r1449950 [9/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ hb...

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,2493 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterAdminProtocol;
+import org.apache.hadoop.hbase.MasterMonitorProtocol;
+import org.apache.hadoop.hbase.MasterProtocol;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
+import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.SoftValueSortedMap;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A non-instantiable class that manages {@link HConnection}s.
+ * This class has a static Map of {@link HConnection} instances keyed by
+ * {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
+ * that pass the same {@link Configuration} instance will be returned the same
+ * {@link  HConnection} instance (Adding properties to a Configuration
+ * instance does not change its object identity).  Sharing {@link HConnection}
+ * instances is usually what you want; all clients of the {@link HConnection}
+ * instances share the HConnections' cache of Region locations rather than each
+ * having to discover for itself the location of meta, root, etc.  It makes
+ * sense for the likes of the pool of HTables class {@link HTablePool}, for
+ * instance (If concerned that a single {@link HConnection} is insufficient
+ * for sharing amongst clients in say an heavily-multithreaded environment,
+ * in practise its not proven to be an issue.  Besides, {@link HConnection} is
+ * implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a
+ * connection per cluster-member, exclusively).
+ *
+ * <p>But sharing connections
+ * makes clean up of {@link HConnection} instances a little awkward.  Currently,
+ * clients cleanup by calling
+ * {@link #deleteConnection(Configuration)}.  This will shutdown the
+ * zookeeper connection the HConnection was using and clean up all
+ * HConnection resources as well as stopping proxies to servers out on the
+ * cluster. Not running the cleanup will not end the world; it'll
+ * just stall the closeup some and spew some zookeeper connection failed
+ * messages into the log.  Running the cleanup on a {@link HConnection} that is
+ * subsequently used by another will cause breakage so be careful running
+ * cleanup.
+ * <p>To create a {@link HConnection} that is not shared by others, you can
+ * create a new {@link Configuration} instance, pass this new instance to
+ * {@link #getConnection(Configuration)}, and then when done, close it up by
+ * doing something like the following:
+ * <pre>
+ * {@code
+ * Configuration newConfig = new Configuration(originalConf);
+ * HConnection connection = HConnectionManager.getConnection(newConfig);
+ * // Use the connection to your hearts' delight and then when done...
+ * HConnectionManager.deleteConnection(newConfig, true);
+ * }
+ * </pre>
+ * <p>Cleanup used to be done inside in a shutdown hook.  On startup we'd
+ * register a shutdown hook that called {@link #deleteAllConnections()}
+ * on its way out but the order in which shutdown hooks run is not defined so
+ * were problematic for clients of HConnection that wanted to register their
+ * own shutdown hooks so we removed ours though this shifts the onus for
+ * cleanup to the client.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HConnectionManager {
+  // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
+  // access must be synchronized.  This map is not private because tests
+  // need to be able to tinker with it.
+  static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
+
+  public static final int MAX_CACHED_HBASE_INSTANCES;
+
+  /** Parameter name for what client protocol to use. */
+  public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
+
+  /** Default client protocol class name. */
+  public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
+
+  /** Parameter name for what admin protocol to use. */
+  public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
+
+  /** Default admin protocol class name. */
+  public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+
+  private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+
+  static {
+    // We set instances to one more than the value specified for {@link
+    // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
+    // connections to the ensemble from the one client is 30, so in that case we
+    // should run into zk issues before the LRU hit this value of 31.
+    MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
+        HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
+        HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+    HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+        (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
+       @Override
+      protected boolean removeEldestEntry(
+          Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
+         return size() > MAX_CACHED_HBASE_INSTANCES;
+       }
+    };
+  }
+
+  /*
+   * Non-instantiable.
+   */
+  protected HConnectionManager() {
+    super();
+  }
+
+  /**
+   * Get the connection that goes with the passed <code>conf</code>
+   * configuration instance.
+   * If no current connection exists, method creates a new connection for the
+   * passed <code>conf</code> instance.
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection getConnection(Configuration conf)
+  throws ZooKeeperConnectionException {
+    HConnectionKey connectionKey = new HConnectionKey(conf);
+    synchronized (HBASE_INSTANCES) {
+      HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
+      if (connection == null) {
+        connection = new HConnectionImplementation(conf, true);
+        HBASE_INSTANCES.put(connectionKey, connection);
+      } else if (connection.isClosed()) {
+        HConnectionManager.deleteConnection(connectionKey, true);
+        connection = new HConnectionImplementation(conf, true);
+        HBASE_INSTANCES.put(connectionKey, connection);
+      }
+      connection.incCount();
+      return connection;
+    }
+  }
+
+  /**
+   * Create a new HConnection instance using the passed <code>conf</code>
+   * instance.
+   * Note: This bypasses the usual HConnection life cycle management!
+   * Use this with caution, the caller is responsible for closing the
+   * created connection.
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection createConnection(Configuration conf)
+  throws ZooKeeperConnectionException {
+    return new HConnectionImplementation(conf, false);
+  }
+
+  /**
+   * Delete connection information for the instance specified by configuration.
+   * If there are no more references to it, this will then close connection to
+   * the zookeeper ensemble and let go of all resources.
+   *
+   * @param conf
+   *          configuration whose identity is used to find {@link HConnection}
+   *          instance.
+   */
+  public static void deleteConnection(Configuration conf) {
+    deleteConnection(new HConnectionKey(conf), false);
+  }
+
+  /**
+   * Delete stale connection information for the instance specified by configuration.
+   * This will then close connection to
+   * the zookeeper ensemble and let go of all resources.
+   *
+   * @param connection
+   */
+  public static void deleteStaleConnection(HConnection connection) {
+    deleteConnection(connection, true);
+  }
+
+  /**
+   * Delete information for all connections.
+   * @throws IOException
+   */
+  public static void deleteAllConnections() {
+    synchronized (HBASE_INSTANCES) {
+      Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
+      connectionKeys.addAll(HBASE_INSTANCES.keySet());
+      for (HConnectionKey connectionKey : connectionKeys) {
+        deleteConnection(connectionKey, false);
+      }
+      HBASE_INSTANCES.clear();
+    }
+  }
+
+  private static void deleteConnection(HConnection connection, boolean staleConnection) {
+    synchronized (HBASE_INSTANCES) {
+      for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
+          .entrySet()) {
+        if (connectionEntry.getValue() == connection) {
+          deleteConnection(connectionEntry.getKey(), staleConnection);
+          break;
+        }
+      }
+    }
+  }
+
+  private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
+    synchronized (HBASE_INSTANCES) {
+      HConnectionImplementation connection = HBASE_INSTANCES
+          .get(connectionKey);
+      if (connection != null) {
+        connection.decCount();
+        if (connection.isZeroReference() || staleConnection) {
+          HBASE_INSTANCES.remove(connectionKey);
+          connection.internalClose();
+        }
+      } else {
+        LOG.error("Connection not found in the list, can't delete it "+
+          "(connection key="+connectionKey+"). May be the key was modified?");
+      }
+    }
+  }
+
+  /**
+   * It is provided for unit test cases which verify the behavior of region
+   * location cache prefetch.
+   * @return Number of cached regions for the table.
+   * @throws ZooKeeperConnectionException
+   */
+  static int getCachedRegionCount(Configuration conf,
+      final byte[] tableName)
+  throws IOException {
+    return execute(new HConnectable<Integer>(conf) {
+      @Override
+      public Integer connect(HConnection connection) {
+        return ((HConnectionImplementation) connection)
+            .getNumberOfCachedRegionLocations(tableName);
+      }
+    });
+  }
+
+  /**
+   * It's provided for unit test cases which verify the behavior of region
+   * location cache prefetch.
+   * @return true if the region where the table and row reside is cached.
+   * @throws ZooKeeperConnectionException
+   */
+  static boolean isRegionCached(Configuration conf,
+      final byte[] tableName, final byte[] row) throws IOException {
+    return execute(new HConnectable<Boolean>(conf) {
+      @Override
+      public Boolean connect(HConnection connection) {
+        return ((HConnectionImplementation) connection).isRegionCached(tableName, row);
+      }
+    });
+  }
+
+  /**
+   * This class makes it convenient for one to execute a command in the context
+   * of a {@link HConnection} instance based on the given {@link Configuration}.
+   *
+   * <p>
+   * If you find yourself wanting to use a {@link HConnection} for a relatively
+   * short duration of time, and do not want to deal with the hassle of creating
+   * and cleaning up that resource, then you should consider using this
+   * convenience class.
+   *
+   * @param <T>
+   *          the return type of the {@link HConnectable#connect(HConnection)}
+   *          method.
+   */
+  public static abstract class HConnectable<T> {
+    public Configuration conf;
+
+    protected HConnectable(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public abstract T connect(HConnection connection) throws IOException;
+  }
+
+  /**
+   * This convenience method invokes the given {@link HConnectable#connect}
+   * implementation using a {@link HConnection} instance that lasts just for the
+   * duration of that invocation.
+   *
+   * @param <T> the return type of the connect method
+   * @param connectable the {@link HConnectable} instance
+   * @return the value returned by the connect method
+   * @throws IOException
+   */
+  public static <T> T execute(HConnectable<T> connectable) throws IOException {
+    if (connectable == null || connectable.conf == null) {
+      return null;
+    }
+    Configuration conf = connectable.conf;
+    HConnection connection = HConnectionManager.getConnection(conf);
+    boolean connectSucceeded = false;
+    try {
+      T returnValue = connectable.connect(connection);
+      connectSucceeded = true;
+      return returnValue;
+    } finally {
+      try {
+        connection.close();
+      } catch (Exception e) {
+        if (connectSucceeded) {
+          throw new IOException("The connection to " + connection
+              + " could not be deleted.", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Denotes a unique key to a {@link HConnection} instance.
+   *
+   * In essence, this class captures the properties in {@link Configuration}
+   * that may be used in the process of establishing a connection. In light of
+   * that, if any new such properties are introduced into the mix, they must be
+   * added to the {@link HConnectionKey#properties} list.
+   *
+   */
+  public static class HConnectionKey {
+    final static String[] CONNECTION_PROPERTIES = new String[] {
+        HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
+        HConstants.ZOOKEEPER_CLIENT_PORT,
+        HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+        HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+        HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+        HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+        HConstants.HBASE_META_SCANNER_CACHING,
+        HConstants.HBASE_CLIENT_INSTANCE_ID };
+
+    private Map<String, String> properties;
+    private String username;
+
+    public HConnectionKey(Configuration conf) {
+      Map<String, String> m = new HashMap<String, String>();
+      if (conf != null) {
+        for (String property : CONNECTION_PROPERTIES) {
+          String value = conf.get(property);
+          if (value != null) {
+            m.put(property, value);
+          }
+        }
+      }
+      this.properties = Collections.unmodifiableMap(m);
+
+      try {
+        User currentUser = User.getCurrent();
+        if (currentUser != null) {
+          username = currentUser.getName();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Error obtaining current user, skipping username in HConnectionKey",
+            ioe);
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      if (username != null) {
+        result = username.hashCode();
+      }
+      for (String property : CONNECTION_PROPERTIES) {
+        String value = properties.get(property);
+        if (value != null) {
+          result = prime * result + value.hashCode();
+        }
+      }
+
+      return result;
+    }
+
+
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ",
+        justification="Optimization")
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      HConnectionKey that = (HConnectionKey) obj;
+      if (this.username != null && !this.username.equals(that.username)) {
+        return false;
+      } else if (this.username == null && that.username != null) {
+        return false;
+      }
+      if (this.properties == null) {
+        if (that.properties != null) {
+          return false;
+        }
+      } else {
+        if (that.properties == null) {
+          return false;
+        }
+        for (String property : CONNECTION_PROPERTIES) {
+          String thisValue = this.properties.get(property);
+          String thatValue = that.properties.get(property);
+          //noinspection StringEquality
+          if (thisValue == thatValue) {
+            continue;
+          }
+          if (thisValue == null || !thisValue.equals(thatValue)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "HConnectionKey{" +
+        "properties=" + properties +
+        ", username='" + username + '\'' +
+        '}';
+    }
+  }
+
+  /** Encapsulates connection to zookeeper and regionservers.*/
+  static class HConnectionImplementation implements HConnection, Closeable {
+    static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
+    private final Class<? extends AdminProtocol> adminClass;
+    private final Class<? extends ClientProtocol> clientClass;
+    private final long pause;
+    private final int numRetries;
+    private final int maxRPCAttempts;
+    private final int rpcTimeout;
+    private final int prefetchRegionLimit;
+
+    private volatile boolean closed;
+    private volatile boolean aborted;
+
+    private final Object metaRegionLock = new Object();
+    private final Object userRegionLock = new Object();
+
+    // We have a single lock for master & zk to prevent deadlocks. Having
+    //  one lock for ZK and one lock for master is not possible:
+    //  When creating a connection to master, we need a connection to ZK to get
+    //  its address. But another thread could have taken the ZK lock, and could
+    //  be waiting for the master lock => deadlock.
+    private final Object masterAndZKLock = new Object();
+
+    private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+    private final DelayedClosing delayedClosing =
+      DelayedClosing.createAndStart(this);
+
+
+    private final Configuration conf;
+
+    // client RPC
+    private RpcClientEngine rpcEngine;
+
+    // Known region ServerName.toString() -> RegionClient/Admin
+    private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
+      new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
+    private final ConcurrentHashMap<String, String> connectionLock =
+      new ConcurrentHashMap<String, String>();
+
+    /**
+     * Map of table to table {@link HRegionLocation}s.  The table key is made
+     * by doing a {@link Bytes#mapKey(byte[])} of the table's name.
+     */
+    private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
+      cachedRegionLocations =
+        new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
+
+    // The presence of a server in the map implies it's likely that there is an
+    // entry in cachedRegionLocations that map to this server; but the absence
+    // of a server in this map guarentees that there is no entry in cache that
+    // maps to the absent server.
+    private final Set<String> cachedServers =
+        new HashSet<String>();
+
+    // region cache prefetch is enabled by default. this set contains all
+    // tables whose region cache prefetch are disabled.
+    private final Set<Integer> regionCachePrefetchDisabledTables =
+      new CopyOnWriteArraySet<Integer>();
+
+    private int refCount;
+
+    // indicates whether this connection's life cycle is managed (by us)
+    private final boolean managed;
+    /**
+     * constructor
+     * @param conf Configuration object
+     */
+    @SuppressWarnings("unchecked")
+    public HConnectionImplementation(Configuration conf, boolean managed)
+    throws ZooKeeperConnectionException {
+      this.conf = conf;
+      this.managed = managed;
+      String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
+        DEFAULT_ADMIN_PROTOCOL_CLASS);
+      this.closed = false;
+      try {
+        this.adminClass =
+          (Class<? extends AdminProtocol>) Class.forName(adminClassName);
+      } catch (ClassNotFoundException e) {
+        throw new UnsupportedOperationException(
+            "Unable to find region server interface " + adminClassName, e);
+      }
+      String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
+        DEFAULT_CLIENT_PROTOCOL_CLASS);
+      try {
+        this.clientClass =
+          (Class<? extends ClientProtocol>) Class.forName(clientClassName);
+      } catch (ClassNotFoundException e) {
+        throw new UnsupportedOperationException(
+            "Unable to find client protocol " + clientClassName, e);
+      }
+      this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      this.maxRPCAttempts = conf.getInt(
+          HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+          HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
+      this.rpcTimeout = conf.getInt(
+          HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+      this.prefetchRegionLimit = conf.getInt(
+          HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+          HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+
+      retrieveClusterId();
+      // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
+      // but we maintain access through an interface to allow overriding for tests
+      // RPC engine setup must follow obtaining the cluster ID for token authentication to work
+      this.rpcEngine = new ProtobufRpcClientEngine(this.conf);
+    }
+
+    /**
+     * An identifier that will remain the same for a given connection.
+     * @return
+     */
+    public String toString(){
+      return "hconnection 0x" + Integer.toHexString( hashCode() );
+    }
+
+    private String clusterId = null;
+    public final void retrieveClusterId(){
+      if (conf.get(HConstants.CLUSTER_ID) != null){
+        return;
+      }
+
+      // No synchronized here, worse case we will retrieve it twice, that's
+      //  not an issue.
+      if (this.clusterId == null){
+        this.clusterId = conf.get(HConstants.CLUSTER_ID);
+        if (this.clusterId == null) {
+          ZooKeeperKeepAliveConnection zkw = null;
+          try {
+            zkw = getKeepAliveZooKeeperWatcher();
+            this.clusterId = ZKClusterId.readClusterIdZNode(zkw);
+            if (clusterId == null) {
+              LOG.info("ClusterId read in ZooKeeper is null");
+            }
+          } catch (KeeperException e) {
+            LOG.warn("Can't retrieve clusterId from Zookeeper", e);
+          } catch (IOException e) {
+            LOG.warn("Can't retrieve clusterId from Zookeeper", e);
+          } finally {
+            if (zkw != null) {
+              zkw.close();
+            }
+          }
+          if (this.clusterId == null) {
+            this.clusterId = "default";
+          }
+
+          LOG.info("ClusterId is " + clusterId);
+        }
+      }
+
+      conf.set(HConstants.CLUSTER_ID, clusterId);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return this.conf;
+    }
+
+    private static class MasterProtocolState {
+      public MasterProtocol protocol;
+      public int userCount;
+      public long keepAliveUntil = Long.MAX_VALUE;
+      public final Class<? extends MasterProtocol> protocolClass;
+
+      public MasterProtocolState (
+          final Class<? extends MasterProtocol> protocolClass) {
+        this.protocolClass = protocolClass;
+      }
+    }
+
+    /**
+     * Create a new Master proxy. Try once only.
+     */
+    private MasterProtocol createMasterInterface(
+        MasterProtocolState masterProtocolState)
+        throws IOException, KeeperException, ServiceException {
+
+      ZooKeeperKeepAliveConnection zkw;
+      try {
+        zkw = getKeepAliveZooKeeperWatcher();
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+      }
+
+      try {
+
+        checkIfBaseNodeAvailable(zkw);
+        ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
+        if (sn == null) {
+          String msg =
+            "ZooKeeper available but no active master location found";
+          LOG.info(msg);
+          throw new MasterNotRunningException(msg);
+        }
+
+
+        InetSocketAddress isa =
+          new InetSocketAddress(sn.getHostname(), sn.getPort());
+        MasterProtocol tryMaster = rpcEngine.getProxy(
+            masterProtocolState.protocolClass,
+            isa, this.conf, this.rpcTimeout);
+
+        if (tryMaster.isMasterRunning(
+            null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
+          return tryMaster;
+        } else {
+          String msg = "Can create a proxy to master, but it is not running";
+          LOG.info(msg);
+          throw new MasterNotRunningException(msg);
+        }
+      } finally {
+        zkw.close();
+      }
+    }
+
+    /**
+     * Create a master, retries if necessary.
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
+    private MasterProtocol createMasterWithRetries(
+      MasterProtocolState masterProtocolState) throws MasterNotRunningException {
+
+      // The lock must be at the beginning to prevent multiple master creation
+      //  (and leaks) in a multithread context
+
+      synchronized (this.masterAndZKLock) {
+        Exception exceptionCaught = null;
+        MasterProtocol master = null;
+        int tries = 0;
+        while (
+          !this.closed && master == null
+          ) {
+          tries++;
+          try {
+            master = createMasterInterface(masterProtocolState);
+          } catch (IOException e) {
+            exceptionCaught = e;
+          } catch (KeeperException e) {
+            exceptionCaught = e;
+          } catch (ServiceException e) {
+            exceptionCaught = e;
+          }
+
+          if (exceptionCaught != null)
+            // It failed. If it's not the last try, we're going to wait a little
+          if (tries < numRetries) {
+            // tries at this point is 1 or more; decrement to start from 0.
+            long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
+            LOG.info("getMaster attempt " + tries + " of " + numRetries +
+              " failed; retrying after sleep of " +pauseTime, exceptionCaught);
+
+            try {
+              Thread.sleep(pauseTime);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(
+                "Thread was interrupted while trying to connect to master.", e);
+            }
+
+          } else {
+            // Enough tries, we stop now
+            LOG.info("getMaster attempt " + tries + " of " + numRetries +
+              " failed; no more retrying.", exceptionCaught);
+            throw new MasterNotRunningException(exceptionCaught);
+          }
+        }
+
+        if (master == null) {
+          // implies this.closed true
+          throw new MasterNotRunningException(
+            "Connection was closed while trying to get master");
+        }
+
+        return master;
+      }
+    }
+
+    private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
+      throws MasterNotRunningException {
+      String errorMsg;
+      try {
+        if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
+          errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
+            + "It should have been written by the master. "
+            + "Check the value configured in 'zookeeper.znode.parent'. "
+            + "There could be a mismatch with the one configured in the master.";
+          LOG.error(errorMsg);
+          throw new MasterNotRunningException(errorMsg);
+        }
+      } catch (KeeperException e) {
+        errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
+        LOG.error(errorMsg);
+        throw new MasterNotRunningException(errorMsg, e);
+      }
+    }
+
+    /**
+     * @return true if the master is running, throws an exception otherwise
+     * @throws MasterNotRunningException - if the master is not running
+     * @throws ZooKeeperConnectionException
+     */
+    @Override
+    public boolean isMasterRunning()
+      throws MasterNotRunningException, ZooKeeperConnectionException {
+      // When getting the master proxy connection, we check it's running,
+      // so if there is no exception, it means we've been able to get a
+      // connection on a running master
+      getKeepAliveMasterMonitor().close();
+      return true;
+    }
+
+    @Override
+    public HRegionLocation getRegionLocation(final byte [] name,
+        final byte [] row, boolean reload)
+    throws IOException {
+      return reload? relocateRegion(name, row): locateRegion(name, row);
+    }
+
+    @Override
+    public boolean isTableEnabled(byte[] tableName) throws IOException {
+      return testTableOnlineState(tableName, true);
+    }
+
+    @Override
+    public boolean isTableDisabled(byte[] tableName) throws IOException {
+      return testTableOnlineState(tableName, false);
+    }
+
+    @Override
+    public boolean isTableAvailable(final byte[] tableName) throws IOException {
+      final AtomicBoolean available = new AtomicBoolean(true);
+      final AtomicInteger regionCount = new AtomicInteger(0);
+      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+        @Override
+        public boolean processRow(Result row) throws IOException {
+          HRegionInfo info = MetaScanner.getHRegionInfo(row);
+          if (info != null) {
+            if (Bytes.equals(tableName, info.getTableName())) {
+              ServerName server = HRegionInfo.getServerName(row);
+              if (server == null) {
+                available.set(false);
+                return false;
+              }
+              regionCount.incrementAndGet();
+            }
+          }
+          return true;
+        }
+      };
+      MetaScanner.metaScan(conf, visitor);
+      return available.get() && (regionCount.get() > 0);
+    }
+
+    /*
+     * @param True if table is online
+     */
+    private boolean testTableOnlineState(byte [] tableName, boolean online)
+    throws IOException {
+      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+        // The root region is always enabled
+        return online;
+      }
+      String tableNameStr = Bytes.toString(tableName);
+      ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
+      try {
+        if (online) {
+          return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr);
+        }
+        return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr);
+      } catch (KeeperException e) {
+        throw new IOException("Enable/Disable failed", e);
+      }finally {
+         zkw.close();
+      }
+    }
+
+    @Override
+    public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
+      return locateRegion(HRegionInfo.getTableName(regionName),
+          HRegionInfo.getStartKey(regionName), false, true);
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final byte[] tableName)
+    throws IOException {
+      return locateRegions (tableName, false, true);
+    }
+    
+    @Override
+    public List<HRegionLocation> locateRegions(final byte[] tableName, final boolean useCache,
+        final boolean offlined) throws IOException {
+      NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, tableName,
+        offlined);
+      final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
+      for (HRegionInfo regionInfo : regions.keySet()) {
+        locations.add(locateRegion(tableName, regionInfo.getStartKey(), useCache, true));
+      }
+      return locations;
+    }
+
+    @Override
+    public HRegionLocation locateRegion(final byte [] tableName,
+        final byte [] row)
+    throws IOException{
+      return locateRegion(tableName, row, true, true);
+    }
+
+    @Override
+    public HRegionLocation relocateRegion(final byte [] tableName,
+        final byte [] row)
+    throws IOException{
+
+      // Since this is an explicit request not to use any caching, finding
+      // disabled tables should not be desirable.  This will ensure that an exception is thrown when
+      // the first time a disabled table is interacted with.
+      if (isTableDisabled(tableName)) {
+        throw new DoNotRetryIOException(Bytes.toString(tableName) + " is disabled.");
+      }
+
+      return locateRegion(tableName, row, false, true);
+    }
+
+    private HRegionLocation locateRegion(final byte [] tableName,
+      final byte [] row, boolean useCache, boolean retry)
+    throws IOException {
+      if (this.closed) throw new IOException(toString() + " closed");
+      if (tableName == null || tableName.length == 0) {
+        throw new IllegalArgumentException(
+            "table name cannot be null or zero length");
+      }
+
+      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+        ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
+        try {
+          LOG.debug("Looking up root region location in ZK," +
+            " connection=" + this);
+          ServerName servername =
+            RootRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout);
+
+          LOG.debug("Looked up root region location, connection=" + this +
+            "; serverName=" + ((servername == null) ? "null" : servername));
+          if (servername == null) return null;
+          return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername, 0);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return null;
+        } finally {
+          zkw.close();
+        }
+      } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+        return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
+          useCache, metaRegionLock, retry);
+      } else {
+        // Region not in the cache - have to go to the meta RS
+        return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
+          useCache, userRegionLock, retry);
+      }
+    }
+
+    /*
+     * Search .META. for the HRegionLocation info that contains the table and
+     * row we're seeking. It will prefetch certain number of regions info and
+     * save them to the global region cache.
+     */
+    private void prefetchRegionCache(final byte[] tableName,
+        final byte[] row) {
+      // Implement a new visitor for MetaScanner, and use it to walk through
+      // the .META.
+      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+        public boolean processRow(Result result) throws IOException {
+          try {
+            HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
+            if (regionInfo == null) {
+              return true;
+            }
+
+            // possible we got a region of a different table...
+            if (!Bytes.equals(regionInfo.getTableName(), tableName)) {
+              return false; // stop scanning
+            }
+            if (regionInfo.isOffline()) {
+              // don't cache offline regions
+              return true;
+            }
+
+            ServerName serverName = HRegionInfo.getServerName(result);
+            if (serverName == null) {
+              return true; // don't cache it
+            }
+            // instantiate the location
+            HRegionLocation loc = new HRegionLocation(regionInfo, serverName,
+                HRegionInfo.getSeqNumDuringOpen(result));
+            // cache this meta entry
+            cacheLocation(tableName, null, loc);
+            return true;
+          } catch (RuntimeException e) {
+            throw new IOException(e);
+          }
+        }
+      };
+      try {
+        // pre-fetch certain number of regions info at region cache.
+        MetaScanner.metaScan(conf, visitor, tableName, row,
+            this.prefetchRegionLimit);
+      } catch (IOException e) {
+        LOG.warn("Encountered problems when prefetch META table: ", e);
+      }
+    }
+
+    /*
+      * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
+      * info that contains the table and row we're seeking.
+      */
+    private HRegionLocation locateRegionInMeta(final byte [] parentTable,
+      final byte [] tableName, final byte [] row, boolean useCache,
+      Object regionLockObject, boolean retry)
+    throws IOException {
+      HRegionLocation location;
+      // If we are supposed to be using the cache, look in the cache to see if
+      // we already have the region.
+      if (useCache) {
+        location = getCachedLocation(tableName, row);
+        if (location != null) {
+          return location;
+        }
+      }
+      int localNumRetries = retry ? numRetries : 1;
+      // build the key of the meta region we should be looking for.
+      // the extra 9's on the end are necessary to allow "exact" matches
+      // without knowing the precise region names.
+      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
+        HConstants.NINES, false);
+      for (int tries = 0; true; tries++) {
+        if (tries >= localNumRetries) {
+          throw new NoServerForRegionException("Unable to find region for "
+            + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
+        }
+
+        HRegionLocation metaLocation = null;
+        try {
+          // locate the root or meta region
+          metaLocation = locateRegion(parentTable, metaKey, true, false);
+          // If null still, go around again.
+          if (metaLocation == null) continue;
+          ClientProtocol server =
+            getClient(metaLocation.getServerName());
+
+          Result regionInfoRow = null;
+          // This block guards against two threads trying to load the meta
+          // region at the same time. The first will load the meta region and
+          // the second will use the value that the first one found.
+          synchronized (regionLockObject) {
+            // If the parent table is META, we may want to pre-fetch some
+            // region info into the global region cache for this table.
+            if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
+                (getRegionCachePrefetch(tableName)) )  {
+              prefetchRegionCache(tableName, row);
+            }
+
+            // Check the cache again for a hit in case some other thread made the
+            // same query while we were waiting on the lock. If not supposed to
+            // be using the cache, delete any existing cached location so it won't
+            // interfere.
+            if (useCache) {
+              location = getCachedLocation(tableName, row);
+              if (location != null) {
+                return location;
+              }
+            } else {
+              forceDeleteCachedLocation(tableName, row);
+            }
+
+            // Query the root or meta region for the location of the meta region
+            regionInfoRow = ProtobufUtil.getRowOrBefore(server,
+              metaLocation.getRegionInfo().getRegionName(), metaKey,
+              HConstants.CATALOG_FAMILY);
+          }
+          if (regionInfoRow == null) {
+            throw new TableNotFoundException(Bytes.toString(tableName));
+          }
+
+          // convert the row result into the HRegionLocation we need!
+          HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
+          if (regionInfo == null) {
+            throw new IOException("HRegionInfo was null or empty in " +
+              Bytes.toString(parentTable) + ", row=" + regionInfoRow);
+          }
+
+          // possible we got a region of a different table...
+          if (!Bytes.equals(regionInfo.getTableName(), tableName)) {
+            throw new TableNotFoundException(
+                  "Table '" + Bytes.toString(tableName) + "' was not found, got: " +
+                  Bytes.toString(regionInfo.getTableName()) + ".");
+          }
+          if (regionInfo.isSplit()) {
+            throw new RegionOfflineException("the only available region for" +
+              " the required row is a split parent," +
+              " the daughters should be online soon: " +
+              regionInfo.getRegionNameAsString());
+          }
+          if (regionInfo.isOffline()) {
+            throw new RegionOfflineException("the region is offline, could" +
+              " be caused by a disable table call: " +
+              regionInfo.getRegionNameAsString());
+          }
+
+          ServerName serverName = HRegionInfo.getServerName(regionInfoRow);
+          if (serverName == null) {
+            throw new NoServerForRegionException("No server address listed " +
+              "in " + Bytes.toString(parentTable) + " for region " +
+              regionInfo.getRegionNameAsString() + " containing row " +
+              Bytes.toStringBinary(row));
+          }
+
+          // Instantiate the location
+          location = new HRegionLocation(regionInfo, serverName,
+              HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
+          cacheLocation(tableName, null, location);
+          return location;
+        } catch (TableNotFoundException e) {
+          // if we got this error, probably means the table just plain doesn't
+          // exist. rethrow the error immediately. this should always be coming
+          // from the HTable constructor.
+          throw e;
+        } catch (IOException e) {
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          if (tries < numRetries - 1) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("locateRegionInMeta parentTable=" +
+                Bytes.toString(parentTable) + ", metaLocation=" +
+                ((metaLocation == null)? "null": "{" + metaLocation + "}") +
+                ", attempt=" + tries + " of " +
+                this.numRetries + " failed; retrying after sleep of " +
+                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
+            }
+          } else {
+            throw e;
+          }
+          // Only relocate the parent region if necessary
+          if(!(e instanceof RegionOfflineException ||
+              e instanceof NoServerForRegionException)) {
+            relocateRegion(parentTable, metaKey);
+          }
+        }
+        try{
+          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Giving up trying to location region in " +
+            "meta: thread is interrupted.");
+        }
+      }
+    }
+
+    /*
+     * Search the cache for a location that fits our table and row key.
+     * Return null if no suitable region is located. TODO: synchronization note
+     *
+     * <p>TODO: This method during writing consumes 15% of CPU doing lookup
+     * into the Soft Reference SortedMap.  Improve.
+     *
+     * @param tableName
+     * @param row
+     * @return Null or region location found in cache.
+     */
+    HRegionLocation getCachedLocation(final byte [] tableName,
+        final byte [] row) {
+      SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
+        getTableLocations(tableName);
+
+      // start to examine the cache. we can only do cache actions
+      // if there's something in the cache for this table.
+      if (tableLocations.isEmpty()) {
+        return null;
+      }
+
+      HRegionLocation possibleRegion = tableLocations.get(row);
+      if (possibleRegion != null) {
+        return possibleRegion;
+      }
+
+      possibleRegion = tableLocations.lowerValueByKey(row);
+      if (possibleRegion == null) {
+        return null;
+      }
+
+      // make sure that the end key is greater than the row we're looking
+      // for, otherwise the row actually belongs in the next region, not
+      // this one. the exception case is when the endkey is
+      // HConstants.EMPTY_END_ROW, signifying that the region we're
+      // checking is actually the last region in the table.
+      byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
+      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
+          KeyValue.getRowComparator(tableName).compareRows(
+              endKey, 0, endKey.length, row, 0, row.length) > 0) {
+        return possibleRegion;
+      }
+
+      // Passed all the way through, so we got nothing - complete cache miss
+      return null;
+    }
+
+    /**
+     * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+     * @param tableName tableName
+     * @param row
+     */
+    void forceDeleteCachedLocation(final byte [] tableName, final byte [] row) {
+      HRegionLocation rl = null;
+      synchronized (this.cachedRegionLocations) {
+        Map<byte[], HRegionLocation> tableLocations =
+            getTableLocations(tableName);
+        // start to examine the cache. we can only do cache actions
+        // if there's something in the cache for this table.
+        if (!tableLocations.isEmpty()) {
+          rl = getCachedLocation(tableName, row);
+          if (rl != null) {
+            tableLocations.remove(rl.getRegionInfo().getStartKey());
+          }
+        }
+      }
+      if ((rl != null) && LOG.isDebugEnabled()) {
+        LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
+          + " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
+          " for tableName=" + Bytes.toString(tableName) +
+          " from cache to make sure we don't use cache for " + Bytes.toStringBinary(row));
+      }
+    }
+
+    @Override
+    public void clearCaches(String sn) {
+      clearCachedLocationForServer(sn);
+    }
+
+    /*
+     * Delete all cached entries of a table that maps to a specific location.
+     *
+     * @param tablename
+     * @param server
+     */
+    private void clearCachedLocationForServer(final String server) {
+      boolean deletedSomething = false;
+      synchronized (this.cachedRegionLocations) {
+        if (!cachedServers.contains(server)) {
+          return;
+        }
+        for (Map<byte[], HRegionLocation> tableLocations :
+          cachedRegionLocations.values()) {
+          for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
+            if (e.getValue().getHostnamePort().equals(server)) {
+              tableLocations.remove(e.getKey());
+              deletedSomething = true;
+            }
+          }
+        }
+        cachedServers.remove(server);
+      }
+      if (deletedSomething && LOG.isDebugEnabled()) {
+        LOG.debug("Removed all cached region locations that map to " + server);
+      }
+    }
+
+    /*
+     * @param tableName
+     * @return Map of cached locations for passed <code>tableName</code>
+     */
+    private SoftValueSortedMap<byte [], HRegionLocation> getTableLocations(
+        final byte [] tableName) {
+      // find the map of cached locations for this table
+      Integer key = Bytes.mapKey(tableName);
+      SoftValueSortedMap<byte [], HRegionLocation> result;
+      synchronized (this.cachedRegionLocations) {
+        result = this.cachedRegionLocations.get(key);
+        // if tableLocations for this table isn't built yet, make one
+        if (result == null) {
+          result = new SoftValueSortedMap<byte [], HRegionLocation>(
+              Bytes.BYTES_COMPARATOR);
+          this.cachedRegionLocations.put(key, result);
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public void clearRegionCache() {
+      synchronized(this.cachedRegionLocations) {
+        this.cachedRegionLocations.clear();
+        this.cachedServers.clear();
+      }
+    }
+
+    @Override
+    public void clearRegionCache(final byte [] tableName) {
+      synchronized (this.cachedRegionLocations) {
+        this.cachedRegionLocations.remove(Bytes.mapKey(tableName));
+      }
+    }
+
+    /**
+     * Put a newly discovered HRegionLocation into the cache.
+     * @param tableName The table name.
+     * @param source the source of the new location, if it's not coming from meta
+     * @param location the new location
+     */
+    private void cacheLocation(final byte [] tableName, final HRegionLocation source,
+        final HRegionLocation location) {
+      boolean isFromMeta = (source == null);
+      byte [] startKey = location.getRegionInfo().getStartKey();
+      Map<byte [], HRegionLocation> tableLocations =
+        getTableLocations(tableName);
+      boolean isNewCacheEntry = false;
+      boolean isStaleUpdate = false;
+      HRegionLocation oldLocation = null;
+      synchronized (this.cachedRegionLocations) {
+        cachedServers.add(location.getHostnamePort());
+        oldLocation = tableLocations.get(startKey);
+        isNewCacheEntry = (oldLocation == null);
+        // If the server in cache sends us a redirect, assume it's always valid.
+        if (!isNewCacheEntry && !oldLocation.equals(source)) {
+          long newLocationSeqNum = location.getSeqNum();
+          // Meta record is stale - some (probably the same) server has closed the region
+          // with later seqNum and told us about the new location.
+          boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
+          // Same as above for redirect. However, in this case, if the number is equal to previous
+          // record, the most common case is that first the region was closed with seqNum, and then
+          // opened with the same seqNum; hence we will ignore the redirect.
+          // There are so many corner cases with various combinations of opens and closes that
+          // an additional counter on top of seqNum would be necessary to handle them all.
+          boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
+          isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
+        }
+        if (!isStaleUpdate) {
+          tableLocations.put(startKey, location);
+        }
+      }
+      if (isNewCacheEntry) {
+        LOG.debug("Cached location for " +
+            location.getRegionInfo().getRegionNameAsString() +
+            " is " + location.getHostnamePort());
+      } else if (isStaleUpdate && !location.equals(oldLocation)) {
+        LOG.debug("Ignoring stale location update for "
+          + location.getRegionInfo().getRegionNameAsString() + ": "
+          + location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
+          + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
+      }
+    }
+
+    @Override
+    @Deprecated
+    public AdminProtocol getAdmin(final String hostname, final int port) throws IOException {
+      return getAdmin(new ServerName(hostname, port, 0L));
+    }
+
+    @Override
+    public AdminProtocol getAdmin(final ServerName serverName)
+        throws IOException {
+      return getAdmin(serverName, false);
+    }
+
+    @Override
+    @Deprecated
+    public ClientProtocol getClient(final String hostname, final int port)
+    throws IOException {
+      return (ClientProtocol)getProtocol(hostname, port, clientClass);
+    }
+
+    @Override
+    public ClientProtocol getClient(final ServerName serverName)
+        throws IOException {
+      return (ClientProtocol)
+          getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
+    }
+
+    @Override
+    @Deprecated
+    public AdminProtocol getAdmin(final String hostname, final int port,
+        final boolean master)
+    throws IOException {
+      return (AdminProtocol)getProtocol(hostname, port, adminClass);
+    }
+
+    @Override
+    public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
+        throws IOException {
+      return (AdminProtocol)getProtocol(
+          serverName.getHostname(), serverName.getPort(), adminClass);
+    }
+
+    /**
+     * Either the passed <code>isa</code> is null or <code>hostname</code>
+     * can be but not both.
+     * @param hostname
+     * @param port
+     * @param protocolClass
+     * @return Proxy.
+     * @throws IOException
+     */
+    IpcProtocol getProtocol(final String hostname,
+        final int port, final Class <? extends IpcProtocol> protocolClass)
+    throws IOException {
+      String rsName = Addressing.createHostAndPortStr(hostname, port);
+      // See if we already have a connection (common case)
+      Map<String, IpcProtocol> protocols = this.servers.get(rsName);
+      if (protocols == null) {
+        protocols = new HashMap<String, IpcProtocol>();
+        Map<String, IpcProtocol> existingProtocols =
+          this.servers.putIfAbsent(rsName, protocols);
+        if (existingProtocols != null) {
+          protocols = existingProtocols;
+        }
+      }
+      String protocol = protocolClass.getName();
+      IpcProtocol server = protocols.get(protocol);
+      if (server == null) {
+        // create a unique lock for this RS + protocol (if necessary)
+        String lockKey = protocol + "@" + rsName;
+        this.connectionLock.putIfAbsent(lockKey, lockKey);
+        // get the RS lock
+        synchronized (this.connectionLock.get(lockKey)) {
+          // do one more lookup in case we were stalled above
+          server = protocols.get(protocol);
+          if (server == null) {
+            try {
+              // Only create isa when we need to.
+              InetSocketAddress address = new InetSocketAddress(hostname, port);
+              // definitely a cache miss. establish an RPC for this RS
+              server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
+                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
+              protocols.put(protocol, server);
+            } catch (RemoteException e) {
+              LOG.warn("RemoteException connecting to RS", e);
+              // Throw what the RemoteException was carrying.
+              throw e.unwrapRemoteException();
+            }
+          }
+        }
+      }
+      return server;
+    }
+
+    @Override
+    @Deprecated
+    public ZooKeeperWatcher getZooKeeperWatcher()
+        throws ZooKeeperConnectionException {
+      canCloseZKW = false;
+
+      try {
+        return getKeepAliveZooKeeperWatcher();
+      } catch (ZooKeeperConnectionException e){
+        throw e;
+      }catch (IOException e) {
+        // Encapsulate exception to keep interface
+        throw new ZooKeeperConnectionException(
+          "Can't create a zookeeper connection", e);
+      }
+    }
+
+
+    private ZooKeeperKeepAliveConnection keepAliveZookeeper;
+    private int keepAliveZookeeperUserCount;
+    private boolean canCloseZKW = true;
+
+    // keepAlive time, in ms. No reason to make it configurable.
+    private static final long keepAlive = 5 * 60 * 1000;
+
+    /**
+     * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have
+     *  finished with it.
+     * @return The shared instance. Never returns null.
+     */
+    public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
+      throws IOException {
+      synchronized (masterAndZKLock) {
+
+        if (keepAliveZookeeper == null) {
+          // We don't check that our link to ZooKeeper is still valid
+          // But there is a retry mechanism in the ZooKeeperWatcher itself
+          keepAliveZookeeper = new ZooKeeperKeepAliveConnection(
+            conf, this.toString(), this);
+        }
+        keepAliveZookeeperUserCount++;
+        keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+
+        return keepAliveZookeeper;
+      }
+    }
+
+    void releaseZooKeeperWatcher(ZooKeeperWatcher zkw) {
+      if (zkw == null){
+        return;
+      }
+      synchronized (masterAndZKLock) {
+        --keepAliveZookeeperUserCount;
+        if (keepAliveZookeeperUserCount <=0 ){
+          keepZooKeeperWatcherAliveUntil =
+            System.currentTimeMillis() + keepAlive;
+        }
+      }
+    }
+
+
+    /**
+     * Creates a Chore thread to check the connections to master & zookeeper
+     *  and close them when they reach their closing time (
+     *  {@link MasterProtocolState#keepAliveUntil} and
+     *  {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
+     *  managed by the release functions and the variable {@link #keepAlive}
+     */
+    private static class DelayedClosing extends Chore implements Stoppable {
+      private HConnectionImplementation hci;
+      Stoppable stoppable;
+
+      private DelayedClosing(
+        HConnectionImplementation hci, Stoppable stoppable){
+        super(
+          "ZooKeeperWatcher and Master delayed closing for connection "+hci,
+          60*1000, // We check every minutes
+          stoppable);
+        this.hci = hci;
+        this.stoppable = stoppable;
+      }
+
+      static DelayedClosing createAndStart(HConnectionImplementation hci){
+        Stoppable stoppable = new Stoppable() {
+              private volatile boolean isStopped = false;
+              @Override public void stop(String why) { isStopped = true;}
+              @Override public boolean isStopped() {return isStopped;}
+            };
+
+        return new DelayedClosing(hci, stoppable);
+      }
+
+      protected void closeMasterProtocol(MasterProtocolState protocolState) {
+        if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
+          hci.closeMasterProtocol(protocolState);
+          protocolState.keepAliveUntil = Long.MAX_VALUE;
+        }
+      }
+
+      @Override
+      protected void chore() {
+        synchronized (hci.masterAndZKLock) {
+          if (hci.canCloseZKW) {
+            if (System.currentTimeMillis() >
+              hci.keepZooKeeperWatcherAliveUntil) {
+
+              hci.closeZooKeeperWatcher();
+              hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+            }
+          }
+          closeMasterProtocol(hci.masterAdminProtocol);
+          closeMasterProtocol(hci.masterMonitorProtocol);
+        }
+      }
+
+      @Override
+      public void stop(String why) {
+        stoppable.stop(why);
+      }
+
+      @Override
+      public boolean isStopped() {
+        return stoppable.isStopped();
+      }
+    }
+
+    private void closeZooKeeperWatcher() {
+      synchronized (masterAndZKLock) {
+        if (keepAliveZookeeper != null) {
+          LOG.info("Closing zookeeper sessionid=0x" +
+            Long.toHexString(
+              keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
+          keepAliveZookeeper.internalClose();
+          keepAliveZookeeper = null;
+        }
+        keepAliveZookeeperUserCount = 0;
+      }
+    }
+
+    private static class MasterProtocolHandler implements InvocationHandler {
+      private HConnectionImplementation connection;
+      private MasterProtocolState protocolStateTracker;
+
+      protected MasterProtocolHandler(HConnectionImplementation connection,
+                                    MasterProtocolState protocolStateTracker) {
+        this.connection = connection;
+        this.protocolStateTracker = protocolStateTracker;
+      }
+
+      @Override
+      public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+        if (method.getName().equals("close") &&
+              method.getParameterTypes().length == 0) {
+          release(connection, protocolStateTracker);
+          return null;
+        } else {
+          try {
+            return method.invoke(protocolStateTracker.protocol, args);
+          }catch  (InvocationTargetException e){
+            // We will have this for all the exception, checked on not, sent
+            //  by any layer, including the functional exception
+            Throwable cause = e.getCause();
+            if (cause == null){
+              throw new RuntimeException(
+                "Proxy invocation failed and getCause is null", e);
+            }
+            if (cause instanceof UndeclaredThrowableException) {
+              cause = cause.getCause();
+            }
+            throw cause;
+          }
+        }
+      }
+
+      private void release(
+        HConnectionImplementation connection,
+        MasterProtocolState target) {
+        connection.releaseMaster(target);
+      }
+    }
+
+    MasterProtocolState masterAdminProtocol =
+      new MasterProtocolState(MasterAdminProtocol.class);
+    MasterProtocolState masterMonitorProtocol =
+      new MasterProtocolState(MasterMonitorProtocol.class);
+
+    /**
+     * This function allows HBaseAdmin and potentially others
+     * to get a shared master connection.
+     *
+     * @return The shared instance. Never returns null.
+     * @throws MasterNotRunningException
+     */
+    private Object getKeepAliveMasterProtocol(
+        MasterProtocolState protocolState, Class connectionClass)
+        throws MasterNotRunningException {
+      synchronized (masterAndZKLock) {
+        if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
+          protocolState.protocol = null;
+          protocolState.protocol = createMasterWithRetries(protocolState);
+        }
+        protocolState.userCount++;
+        protocolState.keepAliveUntil = Long.MAX_VALUE;
+
+        return Proxy.newProxyInstance(
+          connectionClass.getClassLoader(),
+          new Class[]{connectionClass},
+          new MasterProtocolHandler(this, protocolState)
+        );
+      }
+    }
+
+    @Override
+    public MasterAdminProtocol getMasterAdmin() throws MasterNotRunningException {
+      return getKeepAliveMasterAdmin();
+    }
+
+    @Override
+    public MasterMonitorProtocol getMasterMonitor() throws MasterNotRunningException {
+      return getKeepAliveMasterMonitor();
+    }
+
+    @Override
+    public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin()
+        throws MasterNotRunningException {
+      return (MasterAdminKeepAliveConnection)
+        getKeepAliveMasterProtocol(masterAdminProtocol, MasterAdminKeepAliveConnection.class);
+    }
+
+    @Override
+    public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor()
+        throws MasterNotRunningException {
+      return (MasterMonitorKeepAliveConnection)
+        getKeepAliveMasterProtocol(masterMonitorProtocol, MasterMonitorKeepAliveConnection.class);
+    }
+
+    private boolean isKeepAliveMasterConnectedAndRunning(MasterProtocolState protocolState){
+      if (protocolState.protocol == null){
+        return false;
+      }
+      try {
+         return protocolState.protocol.isMasterRunning(
+           null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning();
+      }catch (UndeclaredThrowableException e){
+        // It's somehow messy, but we can receive exceptions such as
+        //  java.net.ConnectException but they're not declared. So we catch
+        //  it...
+        LOG.info("Master connection is not running anymore",
+          e.getUndeclaredThrowable());
+        return false;
+      } catch (ServiceException se) {
+        LOG.warn("Checking master connection", se);
+        return false;
+      }
+    }
+
+   private void releaseMaster(MasterProtocolState protocolState) {
+      if (protocolState.protocol == null){
+        return;
+      }
+      synchronized (masterAndZKLock) {
+        --protocolState.userCount;
+        if (protocolState.userCount <= 0) {
+          protocolState.keepAliveUntil =
+            System.currentTimeMillis() + keepAlive;
+        }
+      }
+    }
+
+    private void closeMasterProtocol(MasterProtocolState protocolState) {
+      if (protocolState.protocol != null){
+        LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
+        protocolState.protocol = null;
+      }
+      protocolState.userCount = 0;
+    }
+
+    /**
+     * Immediate close of the shared master. Can be by the delayed close or
+     *  when closing the connection itself.
+     */
+    private void closeMaster() {
+      synchronized (masterAndZKLock) {
+        closeMasterProtocol(masterAdminProtocol);
+        closeMasterProtocol(masterMonitorProtocol);
+      }
+    }
+
+    @Override
+    public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
+    throws IOException, RuntimeException {
+      return callable.withRetries();
+    }
+
+    @Override
+    public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+    throws IOException, RuntimeException {
+      return callable.withoutRetries();
+    }
+
+    @Deprecated
+    private <R> Callable<MultiResponse> createCallable(
+      final HRegionLocation loc, final MultiAction<R> multi,
+      final byte [] tableName) {
+      // TODO: This does not belong in here!!! St.Ack  HConnections should
+      // not be dealing in Callables; Callables have HConnections, not other
+      // way around.
+      final HConnection connection = this;
+      return new Callable<MultiResponse>() {
+        public MultiResponse call() throws IOException {
+          ServerCallable<MultiResponse> callable =
+            new ServerCallable<MultiResponse>(connection, tableName, null) {
+              public MultiResponse call() throws IOException {
+                return ProtobufUtil.multi(server, multi);
+              }
+
+              @Override
+              public void connect(boolean reload) throws IOException {
+                server = connection.getClient(loc.getServerName());
+              }
+            };
+          return callable.withoutRetries();
+        }
+      };
+   }
+
+   void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
+       ServerName serverName, long seqNum) {
+      HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
+      synchronized (this.cachedRegionLocations) {
+        cacheLocation(hri.getTableName(), source, newHrl);
+      }
+    }
+
+   /**
+    * Deletes the cached location of the region if necessary, based on some error from source.
+    * @param hri The region in question.
+    * @param source The source of the error that prompts us to invalidate cache.
+    */
+    void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
+      boolean isStaleDelete = false;
+      HRegionLocation oldLocation;
+      synchronized (this.cachedRegionLocations) {
+        Map<byte[], HRegionLocation> tableLocations =
+          getTableLocations(hri.getTableName());
+        oldLocation = tableLocations.get(hri.getStartKey());
+        if (oldLocation != null) {
+           // Do not delete the cache entry if it's not for the same server that gave us the error.
+          isStaleDelete = (source != null) && !oldLocation.equals(source);
+          if (!isStaleDelete) {
+            tableLocations.remove(hri.getStartKey());
+          }
+        }
+      }
+      if (isStaleDelete) {
+        LOG.debug("Received an error from " + source.getHostnamePort() + " for region "
+          + hri.getRegionNameAsString() + "; not removing "
+          + oldLocation.getHostnamePort() + " from cache.");
+      }
+    }
+
+    /**
+     * Update the location with the new value (if the exception is a RegionMovedException)
+     * or delete it from the cache.
+     * @param exception an object (to simplify user code) on which we will try to find a nested
+     *                  or wrapped or both RegionMovedException
+     * @param source server that is the source of the location update.
+     */
+    private void updateCachedLocations(final byte[] tableName, Row row,
+      final Object exception, final HRegionLocation source) {
+      if (row == null || tableName == null) {
+        LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
+            ", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
+        return;
+      }
+
+      // Is it something we have already updated?
+      final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
+      if (oldLocation == null) {
+        // There is no such location in the cache => it's been removed already => nothing to do
+        return;
+      }
+
+      HRegionInfo regionInfo = oldLocation.getRegionInfo();
+      final RegionMovedException rme = RegionMovedException.find(exception);
+      if (rme != null) {
+        LOG.info("Region " + regionInfo.getRegionNameAsString() + " moved to " +
+          rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());
+        updateCachedLocation(
+            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
+      } else {
+        deleteCachedLocation(regionInfo, source);
+      }
+    }
+
+    @Override
+    @Deprecated
+    public void processBatch(List<? extends Row> list,
+        final byte[] tableName,
+        ExecutorService pool,
+        Object[] results) throws IOException, InterruptedException {
+      // This belongs in HTable!!! Not in here.  St.Ack
+
+      // results must be the same size as list
+      if (results.length != list.size()) {
+        throw new IllegalArgumentException(
+          "argument results must be the same size as argument list");
+      }
+      processBatchCallback(list, tableName, pool, results, null);
+    }
+
+    /**
+     * Send the queries in parallel on the different region servers. Retries on failures.
+     * If the method returns it means that there is no error, and the 'results' array will
+     * contain no exception. On error, an exception is thrown, and the 'results' array will
+     * contain results and exceptions.
+     * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
+     */
+    @Override
+    @Deprecated
+    public <R> void processBatchCallback(
+      List<? extends Row> list,
+      byte[] tableName,
+      ExecutorService pool,
+      Object[] results,
+      Batch.Callback<R> callback)
+      throws IOException, InterruptedException {
+
+      Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
+      p.processBatchCallback();
+    }
+
+
+    /**
+     * Methods and attributes to manage a batch process are grouped into this single class.
+     * This allows, by creating a Process<R> per batch process to ensure multithread safety.
+     *
+     * This code should be move to HTable once processBatchCallback is not supported anymore in
+     * the HConnection interface.
+     */
+    private static class Process<R> {
+      // Info on the queries and their context
+      private final HConnectionImplementation hci;
+      private final List<? extends Row> rows;
+      private final byte[] tableName;
+      private final ExecutorService pool;
+      private final Object[] results;
+      private final Batch.Callback<R> callback;
+
+      // Used during the batch process
+      private final List<Action<R>> toReplay;
+      private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
+        inProgress;
+      private int curNumRetries;
+
+      // Notified when a tasks is done
+      private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
+
+      private Process(HConnectionImplementation hci, List<? extends Row> list,
+                       byte[] tableName, ExecutorService pool, Object[] results,
+                       Batch.Callback<R> callback){
+        this.hci = hci;
+        this.rows = list;
+        this.tableName = tableName;
+        this.pool = pool;
+        this.results = results;
+        this.callback = callback;
+        this.toReplay = new ArrayList<Action<R>>();
+        this.inProgress =
+          new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
+        this.curNumRetries = 0;
+      }
+
+
+      /**
+       * Group a list of actions per region servers, and send them. The created MultiActions are
+       *  added to the inProgress list.
+       * @param actionsList
+       * @param sleepTime - sleep time before actually executing the actions. Can be zero.
+       * @throws IOException - if we can't locate a region after multiple retries.
+       */
+      private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
+        // group per location => regions server
+        final Map<HRegionLocation, MultiAction<R>> actionsByServer =
+          new HashMap<HRegionLocation, MultiAction<R>>();
+        for (Action<R> aAction : actionsList) {
+          final Row row = aAction.getAction();
+
+          if (row != null) {
+            final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow());
+            if (loc == null) {
+              throw new IOException("No location found, aborting submit.");
+            }
+
+            final byte[] regionName = loc.getRegionInfo().getRegionName();
+            MultiAction<R> actions = actionsByServer.get(loc);
+            if (actions == null) {
+              actions = new MultiAction<R>();
+              actionsByServer.put(loc, actions);
+            }
+            actions.add(regionName, aAction);
+          }
+        }
+
+        // Send the queries and add them to the inProgress list
+        for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
+          Callable<MultiResponse> callable =
+            createDelayedCallable(sleepTime, e.getKey(), e.getValue());
+          if (LOG.isTraceEnabled() && (sleepTime > 0)) {
+            StringBuilder sb = new StringBuilder();
+            for (Action<R> action : e.getValue().allActions()) {
+              sb.append(Bytes.toStringBinary(action.getAction().getRow()) + ";");
+            }
+            LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
+              + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
+          }
+          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
+            new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
+              e.getValue(), e.getKey(), this.pool.submit(callable));
+          this.inProgress.addLast(p);
+        }
+      }
+
+     /**
+      * Resubmit the actions which have failed, after a sleep time.
+      * @throws IOException
+      */
+      private void doRetry() throws IOException{
+        // curNumRetries at this point is 1 or more; decrement to start from 0.
+        final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries - 1);
+        submit(this.toReplay, sleepTime);
+        this.toReplay.clear();
+      }
+
+      /**
+       * Parameterized batch processing, allowing varying return types for
+       * different {@link Row} implementations.
+       * Throws an exception on error. If there are no exceptions, it means that the 'results'
+       *  array is clean.
+       */
+      private void processBatchCallback() throws IOException, InterruptedException {
+        if (this.results.length != this.rows.size()) {
+          throw new IllegalArgumentException(
+            "argument results (size="+results.length+") must be the same size as " +
+              "argument list (size="+this.rows.size()+")");
+        }
+        if (this.rows.isEmpty()) {
+          return;
+        }
+
+        boolean isTraceEnabled = LOG.isTraceEnabled();
+        BatchErrors errors = new BatchErrors();
+        BatchErrors retriedErrors = null;
+        if (isTraceEnabled) {
+          retriedErrors = new BatchErrors();
+        }
+
+        // We keep the number of retry per action.
+        int[] nbRetries = new int[this.results.length];
+
+        // Build the action list. This list won't change after being created, hence the
+        //  indexes will remain constant, allowing a direct lookup.
+        final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
+        for (int i = 0; i < this.rows.size(); i++) {
+          Action<R> action = new Action<R>(this.rows.get(i), i);
+          listActions.add(action);
+        }
+
+        // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
+        submit(listActions, 0);
+
+        // LastRetry is true if, either:
+        //  we had an exception 'DoNotRetry'
+        //  we had more than numRetries for any action
+        //  In this case, we will finish the current retries but we won't start new ones.
+        boolean lastRetry = false;
+        // despite its name numRetries means number of tries. So if numRetries == 1 it means we
+        //  won't retry. And we compare vs. 2 in case someone set it to zero.
+        boolean noRetry = (hci.numRetries < 2);
+
+        // Analyze and resubmit until all actions are done successfully or failed after numRetries
+        while (!this.inProgress.isEmpty()) {
+          // We need the original multi action to find out what actions to replay if
+          //  we have a 'total' failure of the Future<MultiResponse>
+          // We need the HRegionLocation as we give it back if we go out of retries
+          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
+            removeFirstDone();
+
+          // Get the answer, keep the exception if any as we will use it for the analysis
+          MultiResponse responses = null;
+          ExecutionException exception = null;
+          try {
+            responses = currentTask.getThird().get();
+          } catch (ExecutionException e) {
+            exception = e;
+          }
+
+          // Error case: no result at all for this multi action. We need to redo all actions
+          if (responses == null) {
+            for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
+              for (Action<R> action : actions) {
+                Row row = action.getAction();
+                hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond());
+                if (noRetry) {
+                  errors.add(exception, row, currentTask);
+                } else {
+                  if (isTraceEnabled) {
+                    retriedErrors.add(exception, row, currentTask);
+                  }
+                  lastRetry = addToReplay(nbRetries, action);
+                }
+              }
+            }
+          } else { // Success or partial success
+            // Analyze detailed results. We can still have individual failures to be redo.
+            // two specific exceptions are managed:
+            //  - DoNotRetryIOException: we continue to retry for other actions
+            //  - RegionMovedException: we update the cache with the new region location
+            for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
+                responses.getResults().entrySet()) {
+              for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
+                Action<R> correspondingAction = listActions.get(regionResult.getFirst());
+                Object result = regionResult.getSecond();
+                this.results[correspondingAction.getOriginalIndex()] = result;
+
+                // Failure: retry if it's make sense else update the errors lists
+                if (result == null || result instanceof Throwable) {
+                  Row row = correspondingAction.getAction();
+                  hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
+                  if (result instanceof DoNotRetryIOException || noRetry) {
+                    errors.add((Exception)result, row, currentTask);
+                  } else {
+                    if (isTraceEnabled) {
+                      retriedErrors.add((Exception)result, row, currentTask);
+                    }
+                    lastRetry = addToReplay(nbRetries, correspondingAction);
+                  }
+                } else // success
+                  if (callback != null) {
+                    this.callback.update(resultsForRS.getKey(),
+                      this.rows.get(regionResult.getFirst()).getRow(),
+                      (R) result);
+                }
+              }
+            }
+          }
+
+          // Retry all actions in toReplay then clear it.
+          if (!noRetry && !toReplay.isEmpty()) {
+            if (isTraceEnabled) {
+              LOG.trace("Retrying due to errors: " + retriedErrors.getDescriptionAndClear());
+            }
+            doRetry();
+            if (lastRetry) {
+              if (isTraceEnabled) {
+                LOG.trace("No more retries");
+              }
+              noRetry = true;
+            }
+          }
+        }
+
+        errors.rethrowIfAny();
+      }
+
+
+      private class BatchErrors {
+        private List<Throwable> exceptions = new ArrayList<Throwable>();
+        private List<Row> actions = new ArrayList<Row>();
+        private List<String> addresses = new ArrayList<String>();
+
+        public void add(Exception ex, Row row,
+          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> obj) {
+          exceptions.add(ex);
+          actions.add(row);
+          addresses.add(obj.getSecond().getHostnamePort());
+        }
+
+        public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
+          if (!exceptions.isEmpty()) {
+            throw makeException();
+          }
+        }
+
+        public String getDescriptionAndClear(){
+          if (exceptions.isEmpty()) {
+            return "";
+          }
+          String result = makeException().getMessage();
+          exceptions.clear();
+          actions.clear();
+          addresses.clear();
+          return result;
+        }
+
+        private RetriesExhaustedWithDetailsException makeException() {
+          return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
+        }
+      }
+
+
+
+      /**
+       * Put the action that has to be retried in the Replay list.
+       * @return true if we're out of numRetries and it's the last retry.
+       */
+      private boolean addToReplay(int[] nbRetries, Action<R> action) {
+        this.toReplay.add(action);
+        nbRetries[action.getOriginalIndex()]++;
+        if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
+          this.curNumRetries = nbRetries[action.getOriginalIndex()];
+        }
+        // numRetries means number of tries, while curNumRetries means current number of retries. So
+        //  we need to add 1 to make them comparable. And as we look for the last try we compare
+        //  with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
+        //  to initialize it to 1.
+        return ( (this.curNumRetries +1) >= hci.numRetries);
+      }
+
+      /**
+       * Wait for one of tasks to be done, and remove it from the list.
+       * @return the tasks done.
+       */
+      private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
+      removeFirstDone() throws InterruptedException {
+        while (true) {
+          synchronized (finishedTasks) {
+            if (!finishedTasks.isEmpty()) {
+              MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
+
+              // We now need to remove it from the inProgress part.
+              Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
+                inProgress.iterator();
+              while (it.hasNext()) {
+                Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
+                if (task.getFirst() == done) { // We have the exact object. No java equals here.
+                  it.remove();
+                  return task;
+                }
+              }
+              LOG.error("Development error: We didn't see a task in the list. " +
+                done.getRegions());
+            }
+            finishedTasks.wait(10);
+          }
+        }
+      }
+
+      private Callable<MultiResponse> createDelayedCallable(
+        final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
+
+        final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
+
+        return new Callable<MultiResponse>() {
+          private final long creationTime = System.currentTimeMillis();
+
+          @Override
+          public MultiResponse call() throws Exception {
+            try {
+              final long waitingTime = delay + creationTime - System.currentTimeMillis();
+              if (waitingTime > 0) {
+                Thread.sleep(waitingTime);
+              }
+              return delegate.call();
+            } finally {
+              synchronized (finishedTasks) {
+                finishedTasks.add(multi);
+                finishedTasks.notifyAll();
+              }
+            }
+          }
+        };
+      }
+    }
+
+    /*
+     * Return the number of cached region for a table. It will only be called
+     * from a unit test.
+     */

[... 255 lines stripped ...]