You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:09 UTC

svn commit: r1584162 [2/5] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1584162&r1=1584161&r2=1584162&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Apr  2 20:49:09 2014
@@ -19,82 +19,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
-import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HConnectionParams;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
-import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
 import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.MetaUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.thrift.transport.TTransportException;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.SyncFailedException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A non-instantiable class that manages connections to multiple tables in
@@ -248,3390 +182,4 @@ public class HConnectionManager {
     TableServers connection = (TableServers) getConnection(conf);
     return connection.isRegionCached(tableName, row);
   }
-
-  /* Encapsulates finding the servers for an HBase instance */
-  public static class TableServers implements ServerConnection {
-    static final Log LOG = LogFactory.getLog(TableServers.class);
-    private final Class<? extends HRegionInterface> serverInterfaceClass;
-    private final int prefetchRegionLimit;
-
-    private final Object masterLock = new Object();
-    private volatile boolean closed;
-    private volatile HMasterInterface master;
-    private volatile boolean masterChecked;
-
-    private final Object rootRegionLock = new Object();
-    private final Object metaRegionLock = new Object();
-    private final Object userRegionLock = new Object();
-
-    private volatile Configuration conf;
-    private final HConnectionParams params;
-
-    // Used by master and region servers during safe mode only
-    private volatile HRegionLocation rootRegionLocation;
-
-    private final Map<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>> cachedRegionLocations = new ConcurrentHashMap<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>>();
-
-    // amount of time to wait before we consider a server to be in fast fail
-    // mode
-    protected long fastFailThresholdMilliSec;
-    // Keeps track of failures when we cannot talk to a server. Helps in
-    // fast failing clients if the server is down for a long time.
-    protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<HServerAddress, FailureInfo>();
-    // We populate repeatedFailuresMap every time there is a failure. So, to
-    // keep it
-    // from growing unbounded, we garbage collect the failure information
-    // every cleanupInterval.
-    protected final long failureMapCleanupIntervalMilliSec;
-    protected volatile long lastFailureMapCleanupTimeMilliSec;
-    // Amount of time that has to pass, before we clear region -> regionserver
-    // cache
-    // again, when in fast fail mode. This is used to clean unused entries.
-    protected long cacheClearingTimeoutMilliSec;
-    // clear failure Info. Used to clean out all entries.
-    // A safety valve, in case the client does not exit the
-    // fast fail mode for any reason.
-    private long fastFailClearingTimeMilliSec;
-    private final boolean recordClientContext;
-
-    private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
-
-    public void resetOperationContext() {
-      if (!recordClientContext || this.operationContextPerThread == null) {
-        return;
-      }
-
-      List<OperationContext> currContext = this.operationContextPerThread.get();
-
-      if (currContext != null) {
-        currContext.clear();
-      }
-    }
-
-    public List<OperationContext> getAndResetOperationContext() {
-      if (!recordClientContext || this.operationContextPerThread == null) {
-        return null;
-      }
-
-      List<OperationContext> currContext = this.operationContextPerThread.get();
-
-      if (currContext == null) {
-        return null;
-      }
-
-      ArrayList<OperationContext> context = new ArrayList<OperationContext>(
-          currContext);
-
-      // Made a copy, clear the context
-      currContext.clear();
-
-      return context;
-    }
-
-    /**
-     * Keeps track of repeated failures to any region server.
-     *
-     * @author amitanand.s
-     *
-     */
-    protected class FailureInfo {
-      // The number of consecutive failures.
-      private final AtomicLong numConsecutiveFailures = new AtomicLong();
-      // The time when the server started to become unresponsive
-      // Once set, this would never be updated.
-      private long timeOfFirstFailureMilliSec;
-      // The time when the client last tried to contact the server.
-      // This is only updated by one client at a time
-      private volatile long timeOfLatestAttemptMilliSec;
-      // The time when the client last cleared cache for regions assigned
-      // to the server. Used to ensure we don't clearCache too often.
-      private volatile long timeOfLatestCacheClearMilliSec;
-      // Used to keep track of concurrent attempts to contact the server.
-      // In Fast fail mode, we want just one client thread to try to connect
-      // the rest of the client threads will fail fast.
-      private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
-          false);
-
-      public String toString() {
-        return "FailureInfo: numConsecutiveFailures = "
-            + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
-            + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
-            + timeOfLatestAttemptMilliSec
-            + " timeOfLatestCacheClearMilliSec = "
-            + timeOfLatestCacheClearMilliSec
-            + " exclusivelyRetringInspiteOfFastFail  = "
-            + exclusivelyRetringInspiteOfFastFail.get();
-      }
-
-      FailureInfo(long firstFailureTime) {
-        this.timeOfFirstFailureMilliSec = firstFailureTime;
-      }
-    }
-
-    private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<MutableBoolean>();
-
-    // For TESTING purposes only;
-    public Map<HServerAddress, FailureInfo> getFailureMap() {
-      return repeatedFailuresMap;
-    }
-
-    // The presence of a server in the map implies it's likely that there is an
-    // entry in cachedRegionLocations that map to this server; but the absence
-    // of a server in this map guarentees that there is no entry in cache that
-    // maps to the absent server.
-    private final Set<String> cachedServers =
-        new HashSet<String>();
-
-    // region cache prefetch is enabled by default. this set contains all
-    // tables whose region cache prefetch are disabled.
-    private final Set<Integer> regionCachePrefetchDisabledTables =
-      new CopyOnWriteArraySet<Integer>();
-    // The number of times we will retry after receiving a RegionOverloadedException from the
-    // region server. Defaults to 0 (i.e. we will throw the exception and let the client handle retries)
-    // may not always be what you want. But, for the purposes of the HBaseThrift client, that this is
-    // created for, we do not want the thrift layer to hold up IPC threads handling retries.
-    private int maxServerRequestedRetries;
-
-    // keep track of servers that have been updated for batchedLoad
-    // tablename -> Map
-    Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
-    private int batchedUploadSoftFlushRetries;
-    private long batchedUploadSoftFlushTimeoutMillis;
-    private final boolean useThrift;
-
-    private ConcurrentSkipListSet<byte[]> initializedTableSet =
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    /**
-     * constructor
-     * @param conf Configuration object
-     */
-    @SuppressWarnings("unchecked")
-    public TableServers(Configuration conf) {
-      this.conf = conf;
-      params = HConnectionParams.getInstance(conf);
-      this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
-        HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
-
-      String serverClassName =
-        conf.get(HConstants.REGION_SERVER_CLASS,
-            HConstants.DEFAULT_REGION_SERVER_CLASS);
-
-      this.closed = false;
-
-      try {
-        this.serverInterfaceClass =
-          (Class<? extends HRegionInterface>) Class.forName(serverClassName);
-
-      } catch (ClassNotFoundException e) {
-        throw new UnsupportedOperationException(
-            "Unable to find region server interface " + serverClassName, e);
-      }
-
-      // TODO move parameters below into HConnectionParams
-      this.cacheClearingTimeoutMilliSec = conf.getLong(
-        "hbase.client.fastfail.cache.clear.interval",
-          10000); // 10 sec
-      this.fastFailThresholdMilliSec = conf.getLong(
-        "hbase.client.fastfail.threshold",
-          60000); // 1 min
-      this.failureMapCleanupIntervalMilliSec = conf.getLong(
-          "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
-      this.fastFailClearingTimeMilliSec = conf.getLong(
-          "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins
-
-      this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
-          10);
-
-      this.master = null;
-      this.masterChecked = false;
-      this.batchedUploadSoftFlushRetries =
-          conf.getInt("hbase.client.batched-upload.softflush.retries", 10);
-      this.batchedUploadSoftFlushTimeoutMillis =
-          conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
-      batchedUploadUpdatesMap  = new ConcurrentHashMap<String,
-          ConcurrentMap<HRegionInfo, HRegionLocation>>();
-
-      this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
-
-    }
-
-    // Used by master and region servers during safe mode only
-    public void unsetRootRegionLocation() {
-      this.rootRegionLocation = null;
-    }
-
-    // Used by master and region servers during safe mode only
-    public void setRootRegionLocation(HRegionLocation rootRegion) {
-      if (rootRegion == null) {
-        throw new IllegalArgumentException(
-            "Cannot set root region location to null.");
-      }
-      this.rootRegionLocation = rootRegion;
-    }
-
-    public HMasterInterface getMaster() throws MasterNotRunningException {
-      ZooKeeperWrapper zk;
-      try {
-        zk = getZooKeeperWrapper();
-      } catch (IOException e) {
-        throw new MasterNotRunningException(e);
-      }
-
-      HServerAddress masterLocation = null;
-      synchronized (this.masterLock) {
-        for (int tries = 0; !this.closed && !this.masterChecked
-            && this.master == null && tries < params.getNumRetries(); tries++) {
-
-          try {
-            masterLocation = zk.readMasterAddress(zk);
-
-            if (masterLocation != null) {
-              HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy(
-                    HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
-                    masterLocation.getInetSocketAddress(), this.conf,
-                    params.getRpcTimeout(), HBaseRPCOptions.DEFAULT);
-
-              if (tryMaster.isMasterRunning()) {
-                this.master = tryMaster;
-                this.masterLock.notifyAll();
-                break;
-              }
-            }
-
-          } catch (IOException e) {
-            if (tries == params.getNumRetries() - 1) {
-              // This was our last chance - don't bother sleeping
-              LOG.info(
-                  "getMaster attempt " + tries + " of "
-                      + params.getNumRetries() + " failed; no more retrying.",
-                  e);
-              break;
-            }
-            LOG.info(
-                "getMaster attempt " + tries + " of " + params.getNumRetries()
-                    + " failed; retrying after sleep of "
-                    + params.getPauseTime(tries), e);
-          }
-
-          // Cannot connect to master or it is not running. Sleep & retry
-          try {
-            this.masterLock.wait(params.getPauseTime(tries));
-          } catch (InterruptedException e) {
-            // continue
-          }
-        }
-        this.masterChecked = true;
-      }
-      if (this.master == null) {
-        if (masterLocation == null) {
-          throw new MasterNotRunningException();
-        }
-        throw new MasterNotRunningException(masterLocation.toString());
-      }
-      return this.master;
-    }
-
-    public boolean isMasterRunning() {
-      if (this.master == null) {
-        try {
-          getMaster();
-
-        } catch (MasterNotRunningException e) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    public boolean tableExists(final byte[] tableName)
-        throws MasterNotRunningException {
-      getMaster();
-      if (tableName == null) {
-        throw new IllegalArgumentException("Table name cannot be null");
-      }
-      if (isMetaTableName(tableName)) {
-        return true;
-      }
-      boolean exists = false;
-      try {
-        HTableDescriptor[] tables = listTables();
-        for (HTableDescriptor table : tables) {
-          if (Bytes.equals(table.getName(), tableName)) {
-            exists = true;
-          }
-        }
-      } catch (IOException e) {
-        LOG.warn("Testing for table existence threw exception", e);
-      }
-      return exists;
-    }
-
-    /*
-     * @param n
-     *
-     * @return Truen if passed tablename <code>n</code> is equal to the name of
-     * a catalog table.
-     */
-    private static boolean isMetaTableName(final byte[] n) {
-      return MetaUtils.isMetaTableName(n);
-    }
-
-    public HRegionLocation getRegionLocation(final byte[] name,
-        final byte[] row, boolean reload) throws IOException {
-      return reload ? relocateRegion(name, row) : locateRegion(name, row);
-    }
-
-    public HTableDescriptor[] listTables() throws IOException {
-      getMaster();
-      final TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
-      MetaScannerVisitor visitor = new MetaScannerVisitor() {
-        public boolean processRow(Result result) throws IOException {
-          try {
-            byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
-                HConstants.REGIONINFO_QUALIFIER);
-            HRegionInfo info = null;
-            if (value != null) {
-              info = Writables.getHRegionInfo(value);
-            }
-            // Only examine the rows where the startKey is zero length
-            if (info != null && info.getStartKey().length == 0) {
-              uniqueTables.add(info.getTableDesc());
-            }
-            return true;
-          } catch (RuntimeException e) {
-            LOG.error("Result=" + result);
-            throw e;
-          }
-        }
-      };
-      MetaScanner.metaScan(conf, visitor);
-
-      return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
-    }
-
-    public boolean isTableEnabled(byte[] tableName) throws IOException {
-      return testTableOnlineState(tableName, true);
-    }
-
-    public boolean isTableDisabled(byte[] tableName) throws IOException {
-      return testTableOnlineState(tableName, false);
-    }
-
-    public boolean isTableAvailable(final byte[] tableName) throws IOException {
-      final AtomicBoolean available = new AtomicBoolean(true);
-      MetaScannerVisitor visitor = new MetaScannerVisitor() {
-        @Override
-        public boolean processRow(Result row) throws IOException {
-          byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
-              HConstants.REGIONINFO_QUALIFIER);
-          HRegionInfo info = Writables.getHRegionInfoOrNull(value);
-          if (info != null) {
-            if (Bytes.equals(tableName, info.getTableDesc().getName())) {
-              value = row.getValue(HConstants.CATALOG_FAMILY,
-                  HConstants.SERVER_QUALIFIER);
-              if (value == null) {
-                available.set(false);
-                return false;
-              }
-            }
-          }
-          return true;
-        }
-      };
-      MetaScanner.metaScan(conf, visitor);
-      return available.get();
-    }
-
-    /*
-     * If online == true Returns true if all regions are online Returns false in
-     * any other case If online == false Returns true if all regions are offline
-     * Returns false in any other case
-     */
-    private boolean testTableOnlineState(byte[] tableName, boolean online)
-        throws IOException {
-      if (!tableExists(tableName)) {
-        throw new TableNotFoundException(Bytes.toString(tableName));
-      }
-      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-        // The root region is always enabled
-        return true;
-      }
-      int rowsScanned = 0;
-      int rowsOffline = 0;
-      byte[] startKey = HRegionInfo.createRegionName(tableName, null,
-          HConstants.ZEROES, false);
-      byte[] endKey;
-      HRegionInfo currentRegion;
-      Scan scan = new Scan(startKey);
-      scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
-      int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
-      scan.setCaching(rows);
-      ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName,
-          HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME
-          : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT);
-      try {
-        // Open scanner
-        getRegionServerWithRetries(s);
-        do {
-          currentRegion = s.getHRegionInfo();
-          Result r;
-          Result[] rrs;
-          while ((rrs = getRegionServerWithRetries(s)) != null
-              && rrs.length > 0) {
-            r = rrs[0];
-            byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
-                HConstants.REGIONINFO_QUALIFIER);
-            if (value != null) {
-              HRegionInfo info = Writables.getHRegionInfoOrNull(value);
-              if (info != null) {
-                if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
-                  rowsScanned += 1;
-                  rowsOffline += info.isOffline() ? 1 : 0;
-                }
-              }
-            }
-          }
-          endKey = currentRegion.getEndKey();
-        } while (!(endKey == null || Bytes.equals(endKey,
-            HConstants.EMPTY_BYTE_ARRAY)));
-      } finally {
-        s.setClose();
-        // Doing below will call 'next' again and this will close the scanner
-        // Without it we leave scanners open.
-        getRegionServerWithRetries(s);
-      }
-      LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
-      boolean onOffLine = online ? rowsOffline == 0
-          : rowsOffline == rowsScanned;
-      return rowsScanned > 0 && onOffLine;
-    }
-
-    private static class HTableDescriptorFinder implements
-        MetaScanner.MetaScannerVisitor {
-      byte[] tableName;
-      HTableDescriptor result;
-
-      protected HTableDescriptorFinder(byte[] tableName) {
-        this.tableName = tableName;
-      }
-
-      public boolean processRow(Result rowResult) throws IOException {
-        HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue(
-            HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
-        HTableDescriptor desc = info.getTableDesc();
-        if (Bytes.compareTo(desc.getName(), tableName) == 0) {
-          result = desc;
-          return false;
-        }
-        return true;
-      }
-
-      HTableDescriptor getResult() {
-        return result;
-      }
-    }
-
-    public HTableDescriptor getHTableDescriptor(final byte[] tableName)
-        throws IOException {
-      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-        return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
-      }
-      if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
-        return HTableDescriptor.META_TABLEDESC;
-      }
-      HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName);
-      MetaScanner.metaScan(conf, finder, tableName);
-      HTableDescriptor result = finder.getResult();
-      if (result == null) {
-        throw new TableNotFoundException(Bytes.toString(tableName));
-      }
-      return result;
-    }
-
-    public HRegionLocation locateRegion(final byte[] tableName, final byte[] row)
-        throws IOException {
-      return locateRegion(tableName, row, true);
-    }
-
-    public HRegionLocation relocateRegion(final byte[] tableName,
-        final byte[] row) throws IOException {
-      return locateRegion(tableName, row, false);
-    }
-
-    private HRegionLocation locateRegion(final byte[] tableName,
-        final byte[] row, boolean useCache) throws IOException {
-      if (tableName == null || tableName.length == 0) {
-        throw new IllegalArgumentException(
-            "table name cannot be null or zero length");
-      }
-      if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
-        return locateMetaInRoot(row, useCache, metaRegionLock);
-      } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-        synchronized (rootRegionLock) {
-          // This block guards against two threads trying to find the root
-          // region at the same time. One will go do the find while the
-          // second waits. The second thread will not do find.
-
-          if (!useCache || rootRegionLocation == null
-              || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
-            this.rootRegionLocation = locateRootRegion();
-            LOG.info("Updated rootRegionLocation from ZK to : " + this.rootRegionLocation);
-          }
-          return this.rootRegionLocation;
-        }
-      } else {
-        // Region not in the cache - have to go to the meta RS
-        return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
-            useCache, userRegionLock);
-      }
-    }
-
-    private HRegionLocation prefetchRegionCache(final byte[] tableName,
-                                     final byte[] row) {
-      return prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
-    }
-
-    /*
-     * Search .META. for the HRegionLocation info that contains the table and
-     * row we're seeking. It will prefetch certain number of regions info and
-     * save them to the global region cache.
-     */
-    private HRegionLocation prefetchRegionCache(final byte[] tableName,
-        final byte[] row, int prefetchRegionLimit) {
-      // Implement a new visitor for MetaScanner, and use it to walk through
-      // the .META.
-      MetaScannerVisitor visitor = new MetaScannerVisitor() {
-        public boolean processRow(Result result) throws IOException {
-          try {
-            byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
-                HConstants.REGIONINFO_QUALIFIER);
-            HRegionInfo regionInfo = null;
-
-            if (value != null) {
-              // convert the row result into the HRegionLocation we need!
-              regionInfo = Writables.getHRegionInfo(value);
-
-              // possible we got a region of a different table...
-              if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
-                return false; // stop scanning
-              }
-              if (regionInfo.isOffline()) {
-                // don't cache offline regions
-                return true;
-              }
-              value = result.getValue(HConstants.CATALOG_FAMILY,
-                  HConstants.SERVER_QUALIFIER);
-              if (value == null) {
-                return true; // don't cache it
-              }
-              final String serverAddress = Bytes.toString(value);
-
-              value = result.getValue(HConstants.CATALOG_FAMILY,
-                  HConstants.STARTCODE_QUALIFIER);
-              long serverStartCode = -1;
-              if (value != null) {
-                serverStartCode = Bytes.toLong(value);
-              }
-
-              // instantiate the location
-              HRegionLocation loc = new HRegionLocation(regionInfo,
-                  new HServerAddress(serverAddress), serverStartCode);
-              // cache this meta entry
-              cacheLocation(tableName, loc);
-            }
-            return true;
-          } catch (RuntimeException e) {
-            throw new IOException(e);
-          }
-        }
-      };
-      try {
-        // pre-fetch certain number of regions info at region cache.
-        MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit);
-        return getCachedLocation(tableName, row);
-      } catch (IOException e) {
-        LOG.warn("Encounted problems when prefetch META table: ", e);
-      }
-      return null;
-    }
-
-    /**
-      * Search the meta table (.META.) for the HRegionLocation info that
-      * contains the table and row we're seeking.
-      */
-    private HRegionLocation locateRegionInMeta(final byte [] parentTable,
-      final byte [] tableName, final byte [] row, boolean useCache,
-      Object regionLockObject)
-    throws IOException {
-      HRegionLocation location;
-      if (useCache) {
-        location = getCachedLocation(tableName, row);
-        if (location != null) {
-          return location;
-        }
-      }
-
-      // If we are supposed to be using the cache, look in the cache to see if
-      // we already have the region.
-
-      // build the key of the meta region we should be looking for.
-      // the extra 9's on the end are necessary to allow "exact" matches
-      // without knowing the precise region names.
-      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
-        HConstants.NINES, false);
-      for (int tries = 0; true; tries++) {
-        if (tries >= params.getNumRetries()) {
-          throw new NoServerForRegionException("Unable to find region for "
-            + Bytes.toStringBinary(row) + " after " + params.getNumRetries() +
-            " tries.");
-        }
-
-        FailureInfo fInfo = null;
-        HServerAddress server = null;
-        boolean didTry = false;
-        boolean couldNotCommunicateWithServer = false;
-        boolean retryDespiteFastFailMode = false;
-        try {
-          // locate the root or meta region
-          HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
-
-          server = metaLocation.getServerAddress();
-          fInfo = repeatedFailuresMap.get(server);
-
-          // Handle the case where .META. is on an unresponsive server.
-          if (inFastFailMode(server) &&
-              !this.currentThreadInFastFailMode()) {
-            // In Fast-fail mode, all but one thread will fast fail. Check
-            // if we are that one chosen thread.
-
-            retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-
-            if (retryDespiteFastFailMode == false) { // we don't have to retry
-              throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
-                  fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
-            }
-          }
-          didTry = true;
-
-          HBaseThriftRPC.isMeta.get().push(true);
-          Result regionInfoRow = null;
-          try {
-            // This block guards against two threads trying to load the meta
-            // region at the same time. The first will load the meta region and
-            // the second will use the value that the first one found.
-            synchronized (regionLockObject) {
-              // Check the cache again for a hit in case some other thread made the
-              // same query while we were waiting on the lock. If not supposed to
-              // be using the cache, delete any existing cached location so it won't
-              // interfere.
-              if (useCache) {
-                location = getCachedLocation(tableName, row);
-                if (location != null) {
-                  return location;
-                }
-              } else {
-                LOG.debug("Deleting the client location cache.");
-                deleteCachedLocation(tableName, row, null);
-              }
-
-              // If the parent table is META, we may want to pre-fetch some
-              // region info into the global region cache for this table.
-              if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
-                (getRegionCachePrefetch(tableName)) )  {
-                LOG.debug("Prefetching the client location cache.");
-                location = prefetchRegionCache(tableName, row);
-                if (location != null) {
-                  return location;
-                }
-              }
-
-              HRegionInterface serverInterface =
-                getHRegionConnection(metaLocation.getServerAddress());
-
-              // Query the root or meta region for the location of the meta region
-              regionInfoRow = serverInterface.getClosestRowBefore(
-                metaLocation.getRegionInfo().getRegionName(), metaKey,
-                HConstants.CATALOG_FAMILY);
-          }
-        } catch (Exception e) {
-          throw e;
-        } finally {
-          HBaseThriftRPC.isMeta.get().pop();
-        }
-        location = getLocationFromRow(regionInfoRow, tableName,
-          parentTable, row);
-        cacheLocation(tableName, location);
-        return location;
-      } catch (TableNotFoundException e) {
-        // if we got this error, probably means the table just plain doesn't
-        // exist. rethrow the error immediately. this should always be coming
-        // from the HTable constructor.
-        throw e;
-      } catch (PreemptiveFastFailException e) {
-        // already processed this. Don't process this again.
-        throw e;
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler
-            .decodeRemoteException((RemoteException) e);
-        } else if (isNetworkException(e)) {
-          couldNotCommunicateWithServer = true;
-          handleFailureToServer(server, e);
-        }
-        if (tries < params.getNumRetries() - 1) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("locateRegionInMeta attempt " + tries + " of "
-              + params.getNumRetries()
-              + " failed; retrying after sleep of "
-              + params.getPauseTime(tries) + " because: " + e.getMessage());
-          }
-        } else {
-          throw e;
-        }
-        // Only relocate the parent region if necessary
-        if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) {
-          relocateRegion(parentTable, row);
-        }
-      } catch (Exception e) {
-        couldNotCommunicateWithServer = true;
-        handleFailureToServer(server, e);
-        if (tries < params.getNumRetries() - 1) {
-          LOG.debug("locateRegionInMeta attempt " + tries + " of "
-            + params.getNumRetries() + " failed; retrying after sleep of "
-            + params.getPauseTime(tries) + " because: " + e.getMessage());
-        } else {
-          throw e;
-        }
-      } finally {
-        updateFailureInfoForServer(server, fInfo, didTry,
-          couldNotCommunicateWithServer, retryDespiteFastFailMode);
-      }
-      try {
-        Thread.sleep(params.getPauseTime(tries));
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-  }
-
-  private HRegionLocation getLocationFromRow(Result regionInfoRow,
-                                             byte[] tableName, byte[] parentTable, byte[] row) throws IOException {
-    if (regionInfoRow == null) {
-      throw new TableNotFoundException(Bytes.toString(tableName));
-    }
-    byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
-      HConstants.REGIONINFO_QUALIFIER);
-    if (value == null || value.length == 0) {
-      throw new IOException("HRegionInfo was null or empty in "
-        + Bytes.toString(parentTable) + ", row=" + regionInfoRow);
-    }
-    // convert the row result into the HRegionLocation we need!
-    HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value,
-      new HRegionInfo());
-    // possible we got a region of a different table...
-    if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
-      throw new TableNotFoundException("Table '" + Bytes.toString(tableName)
-        + "' was not found.");
-    }
-    if (regionInfo.isOffline()) {
-      throw new RegionOfflineException("region offline: "
-        + regionInfo.getRegionNameAsString());
-    }
-
-    value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
-      HConstants.SERVER_QUALIFIER);
-    String serverAddress = "";
-    if (value != null) {
-      serverAddress = Bytes.toString(value);
-    }
-    if (serverAddress.equals("")) {
-      throw new NoServerForRegionException("No server address listed "
-        + "in " + Bytes.toString(parentTable) + " for region "
-        + regionInfo.getRegionNameAsString() + " containing row "
-        + Bytes.toStringBinary(row));
-    }
-
-    value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
-      HConstants.STARTCODE_QUALIFIER);
-    long serverStartCode = -1;
-    if (value != null) {
-      serverStartCode = Bytes.toLong(value);
-    }
-    // instantiate the location
-    HRegionLocation location = new HRegionLocation(regionInfo,
-      new HServerAddress(serverAddress), serverStartCode);
-    return location;
-  }
-
-  /**
-   * TODO:WARNING!!! This looks like a lot of duplicated code with
-   * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls
-   * fix this! Search the Root Table for the Meta Region. Retries a fixed
-   * number of times and throws if Region is not found.
-   */
-  private HRegionLocation locateMetaInRoot(final byte[] row,
-                                           boolean useCache, Object regionLockObject) throws IOException {
-    HRegionLocation location;
-    final byte[] parentTable = HConstants.ROOT_TABLE_NAME;
-    final byte[] tableName = HConstants.META_TABLE_NAME;
-    if (useCache) {
-      location = getCachedLocation(tableName, row);
-      if (location != null) {
-        return location;
-      }
-    }
-    // If we are supposed to be using the cache, look in the cache to see if
-    // we already have the region.
-
-    // build the key of the meta region we should be looking for.
-    // the extra 9's on the end are necessary to allow "exact" matches
-    // without knowing the precise region names.
-    byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
-      HConstants.NINES, false);
-    for (int tries = 0; true; tries++) {
-      if (tries >= params.getNumRetries()) {
-        throw new NoServerForRegionException("Unable to find region for "
-          + Bytes.toStringBinary(row) + " after " + params.getNumRetries()
-          + " tries.");
-      }
-
-      TableServers.FailureInfo fInfo = null;
-      HServerAddress server = null;
-      boolean didTry = false;
-      boolean couldNotCommunicateWithServer = false;
-      boolean retryDespiteFastFailMode = false;
-      try {
-        // locate the root or meta region
-        HRegionLocation metaLocation = null;
-        if (useCache && rootRegionLocation != null
-          && !inFastFailMode(this.rootRegionLocation.getServerAddress())) {
-          metaLocation = rootRegionLocation;
-        } else {
-          synchronized (rootRegionLock) {
-            // This block guards against two threads trying to find the root
-            // region at the same time. One will go do the find while the
-            // second waits. The second thread will not do find.
-
-            if (!useCache || rootRegionLocation == null
-              || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
-              HBaseThriftRPC.isMeta.get().push(true);
-              try {
-                this.rootRegionLocation = locateRootRegion();
-              } catch (Exception e) {
-                throw e;
-              } finally {
-                HBaseThriftRPC.isMeta.get().pop();
-              }
-              LOG.info("Updated rootRegionLocation from ZK to : "
-                + this.rootRegionLocation);
-            }
-            metaLocation = this.rootRegionLocation;
-          }
-        }
-        Preconditions.checkNotNull(metaLocation);
-        server = metaLocation.getServerAddress();
-        fInfo = repeatedFailuresMap.get(server);
-
-        // Handle the case where .META. is on an unresponsive server.
-        if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) {
-          // In Fast-fail mode, all but one thread will fast fail. Check
-          // if we are that one chosen thread.
-
-          retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-
-          if (retryDespiteFastFailMode == false) { // we don't have to retry
-            throw new PreemptiveFastFailException(
-              fInfo.numConsecutiveFailures.get(),
-              fInfo.timeOfFirstFailureMilliSec,
-              fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
-          }
-        }
-        didTry = true;
-        HBaseThriftRPC.isMeta.get().push(true);
-        Result regionInfoRow = null;
-        // This block guards against two threads trying to load the meta
-        // region at the same time. The first will load the meta region and
-        // the second will use the value that the first one found.
-        synchronized (metaRegionLock) {
-          if (useCache) {
-            location = getCachedLocation(tableName, row);
-            if (location != null) {
-              return location;
-            }
-          } else {
-            LOG.debug("Deleting the client location cache.");
-            deleteCachedLocation(tableName, row, null);
-          }
-          HRegionInterface serverInterface = getHRegionConnection(metaLocation
-            .getServerAddress());
-
-          // Query the root for the location of the meta region
-          regionInfoRow = serverInterface.getClosestRowBefore(metaLocation
-            .getRegionInfo().getRegionName(), metaKey,
-            HConstants.CATALOG_FAMILY);
-          location = getLocationFromRow(regionInfoRow, tableName,
-            parentTable, row);
-          cacheLocation(tableName, location);
-        }
-        return location;
-      } catch (TableNotFoundException e) {
-        // if we got this error, probably means the table just plain doesn't
-        // exist. rethrow the error immediately. this should always be coming
-        // from the HTable constructor.
-        throw e;
-      } catch (PreemptiveFastFailException e) {
-        // already processed this. Don't process this again.
-        throw e;
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler
-            .decodeRemoteException((RemoteException) e);
-        } else if (isNetworkException(e)) {
-          couldNotCommunicateWithServer = true;
-          handleFailureToServer(server, e);
-        }
-        if (tries < params.getNumRetries() - 1) {
-          LOG.debug("IOException locateRegionInMeta attempt " + tries
-            + " of " + params.getNumRetries()
-            + " failed; retrying after sleep of "
-            + params.getPauseTime(tries) + " because: " + e.getMessage());
-        } else {
-          throw e;
-        }
-      } catch (Exception e) {
-        couldNotCommunicateWithServer = true;
-        handleFailureToServer(server, e);
-        if (tries < params.getNumRetries() - 1) {
-          LOG.debug("Exception locateRegionInMeta attempt " + tries + " of "
-            + params.getNumRetries() + " failed; retrying after sleep of "
-            + params.getPauseTime(tries) + " because: " + e.getMessage());
-        } else {
-          throw e;
-        }
-      } finally {
-        HBaseThriftRPC.isMeta.get().pop();
-        updateFailureInfoForServer(server, fInfo, didTry,
-          couldNotCommunicateWithServer, retryDespiteFastFailMode);
-      }
-      try {
-        Thread.sleep(params.getPauseTime(tries));
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-  }
-
-
-
-
-    /**
-     * Check if the exception is something that indicates that we cannot
-     * contact/communicate with the server.
-     *
-     * @param e
-     * @return
-     */
-    private boolean isNetworkException(Throwable e) {
-      // This list covers most connectivity exceptions but not all.
-      // For example, in SocketOutputStream a plain IOException is thrown
-      // at times when the channel is closed.
-      return (e instanceof SocketTimeoutException ||
-              e instanceof ConnectException ||
-              e instanceof ClosedChannelException ||
-              e instanceof SyncFailedException ||
-              e instanceof EOFException ||
-              e instanceof TTransportException);
-    }
-
-    public Collection<HRegionLocation> getCachedHRegionLocations(final byte [] tableName,
-                                                                 boolean forceRefresh) {
-      if (forceRefresh || !initializedTableSet.contains(tableName)) {
-        prefetchRegionCache(tableName, null, Integer.MAX_VALUE);
-        initializedTableSet.add(tableName);
-      }
-
-      ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
-        getTableLocations(tableName);
-      return tableLocations.values();
-    }
-
-    /*
-     * Search the cache for a location that fits our table and row key.
-     * Return null if no suitable region is located. TODO: synchronization note
-     *
-     * <p>TODO: This method during writing consumes 15% of CPU doing lookup
-     * into the Soft Reference SortedMap.  Improve.
-     *
-     * @param tableName
-     * @param row
-     * @return Null or region location found in cache.
-     */
-    HRegionLocation getCachedLocation(final byte [] tableName,
-        final byte [] row) {
-      ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
-        getTableLocations(tableName);
-
-      // start to examine the cache. we can only do cache actions
-      // if there's something in the cache for this table.
-      if (tableLocations.isEmpty()) {
-        return null;
-      }
-
-      HRegionLocation rl = tableLocations.get(row);
-      if (rl != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row)
-              + "> in tableName " + Bytes.toString(tableName)
-              + ": location server " + rl.getServerAddress()
-              + ", location region name "
-              + rl.getRegionInfo().getRegionNameAsString());
-        }
-        return rl;
-      }
-
-      // get the matching region for the row
-      Entry<byte[], HRegionLocation> entry = tableLocations.floorEntry(row);
-      HRegionLocation possibleRegion = (entry == null) ? null : entry
-          .getValue();
-
-      // we need to examine the cached location to verify that it is
-      // a match by end key as well.
-      if (possibleRegion != null) {
-        byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
-
-        // make sure that the end key is greater than the row we're looking
-        // for, otherwise the row actually belongs in the next region, not
-        // this one. the exception case is when the endkey is
-        // HConstants.EMPTY_START_ROW, signifying that the region we're
-        // checking is actually the last region in the table.
-        if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
-            || KeyValue.getRowComparator(tableName).compareRows(endKey, 0,
-                endKey.length, row, 0, row.length) > 0) {
-          return possibleRegion;
-        }
-      }
-
-      // Passed all the way through, so we got nothing - complete cache miss
-      return null;
-    }
-
-    /*
-     * Delete a cached location for the specified table name and row if it is
-     * located on the (optionally) specified old location.
-     */
-    @Override
-    public void deleteCachedLocation(final byte[] tableName, final byte[] row,
-        HServerAddress oldServer) {
-      synchronized (this.cachedRegionLocations) {
-        Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
-
-        // start to examine the cache. we can only do cache actions
-        // if there's something in the cache for this table.
-        if (!tableLocations.isEmpty()) {
-          HRegionLocation rl = getCachedLocation(tableName, row);
-          if (rl != null) {
-            // If oldLocation is specified. deleteLocation only if it is the
-            // same.
-            if (oldServer != null && !oldServer.equals(rl.getServerAddress()))
-              return; // perhaps, some body else cleared and repopulated.
-
-            tableLocations.remove(rl.getRegionInfo().getStartKey());
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString()
-                  + " for tableName=" + Bytes.toString(tableName)
-                  + " from cache " + "because of " + Bytes.toStringBinary(row));
-            }
-          }
-        }
-      }
-    }
-
-    /*
-     * Delete all cached entries of a table that maps to a specific location.
-     *
-     * @param tablename
-     *
-     * @param server
-     */
-    protected void clearCachedLocationForServer(final String server) {
-      boolean deletedSomething = false;
-      synchronized (this.cachedRegionLocations) {
-        if (!cachedServers.contains(server)) {
-          return;
-        }
-        for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations
-            .values()) {
-          for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
-            if (e.getValue().getServerAddress().toString().equals(server)) {
-              tableLocations.remove(e.getKey());
-              deletedSomething = true;
-            }
-          }
-        }
-        cachedServers.remove(server);
-      }
-      if (deletedSomething && LOG.isDebugEnabled()) {
-        LOG.debug("Removed all cached region locations that map to " + server);
-      }
-    }
-
-    /*
-     * @param tableName
-     *
-     * @return Map of cached locations for passed <code>tableName</code>
-     */
-    private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
-        final byte[] tableName) {
-      // find the map of cached locations for this table
-      Integer key = Bytes.mapKey(tableName);
-      ConcurrentSkipListMap<byte[], HRegionLocation> result = this.cachedRegionLocations
-          .get(key);
-      if (result == null) {
-        synchronized (this.cachedRegionLocations) {
-          result = this.cachedRegionLocations.get(key);
-          if (result == null) {
-            // if tableLocations for this table isn't built yet, make one
-            result = new ConcurrentSkipListMap<byte[], HRegionLocation>(
-                Bytes.BYTES_COMPARATOR);
-            this.cachedRegionLocations.put(key, result);
-          }
-        }
-      }
-      return result;
-    }
-
-    /**
-     * Allows flushing the region cache.
-     */
-    public void clearRegionCache() {
-      synchronized (this.cachedRegionLocations) {
-        cachedRegionLocations.clear();
-        cachedServers.clear();
-      }
-    }
-
-    /**
-     * Put a newly discovered HRegionLocation into the cache.
-     */
-    private void cacheLocation(final byte [] tableName,
-        final HRegionLocation location) {
-      byte [] startKey = location.getRegionInfo().getStartKey();
-      boolean hasNewCache;
-      synchronized (this.cachedRegionLocations) {
-        cachedServers.add(location.getServerAddress().toString());
-        hasNewCache = (getTableLocations(tableName).put(startKey, location) == null);
-      }
-      if (hasNewCache) {
-        LOG.debug("Cached location for " +
-          location.getRegionInfo().getRegionNameAsString() +
-          " is " + location.getServerAddress().toString());
-      }
-    }
-
-    @Override
-    public HRegionInterface getHRegionConnection(
-        HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
-    throws IOException {
-      if (getMaster) {
-        LOG.debug("Getting master");
-        getMaster();
-      }
-      HRegionInterface server = HRegionServer.getMainRS(regionServer);
-      if (server != null && !this.useThrift) {
-        return server;
-      }
-
-      final boolean thriftPortWrittenToMeta = conf.getBoolean(
-         HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
-         HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT);
-      final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta;
-
-      try {
-        // establish an RPC for this RS
-        // set hbase.ipc.client.connect.max.retries to retry connection
-        // attempts
-        if (this.useThrift) {
-          Class<? extends ThriftClientInterface> serverInterface =
-            ThriftHRegionInterface.class;
-          if (thriftPortWrittenToMeta) {
-            try {
-              server = (HRegionInterface) HBaseThriftRPC.getClient(
-                regionServer.getInetSocketAddress(), this.conf,
-                serverInterface, options);
-            } catch (Exception e) {
-              LOG.warn("Exception connecting to the region server on" +
-                "the thrift channel. Retrying on the HadoopRPC port", e);
-              InetSocketAddress addr = new InetSocketAddress(regionServer
-                .getInetSocketAddress().getHostName(), conf.getInt(
-                HConstants.REGIONSERVER_PORT,
-                HConstants.DEFAULT_REGIONSERVER_PORT));
-              server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
-                HBaseRPCProtocolVersion.versionID, addr, this.conf,
-                params.getRpcTimeout(), options);
-            }
-          } else {
-            try {
-              InetSocketAddress addr = new InetSocketAddress(regionServer
-                .getInetSocketAddress().getHostName(), conf.getInt(
-                HConstants.REGIONSERVER_SWIFT_PORT,
-                HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT));
-              server = (HRegionInterface) HBaseThriftRPC.getClient(addr,
-                this.conf, serverInterface, options);
-            } catch (Exception e) {
-              LOG.warn("Exception connecting to the region server on" +
-                "the thrift channel. Retrying on the HadoopRPC port", e);
-              server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
-                HBaseRPCProtocolVersion.versionID,
-                regionServer.getInetSocketAddress(), this.conf,
-                params.getRpcTimeout(), options);
-            }
-          }
-        } else {
-          if (hadoopPortWrittenToMeta) {
-            server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
-              HBaseRPCProtocolVersion.versionID,
-              regionServer.getInetSocketAddress(), this.conf,
-              params.getRpcTimeout(), options);
-          } else {
-            // The hadoop port is no longer written to Meta (this will happen
-            // when we are reasonably confident about the thrift service, and
-            // will soon deprecate RPC). So, try to connect to the default port.
-            // TODO gauravm: Verify that this works (t2830553)
-            InetSocketAddress addr = new InetSocketAddress(regionServer
-              .getInetSocketAddress().getHostName(), conf.getInt(
-              HConstants.REGIONSERVER_PORT,
-              HConstants.DEFAULT_REGIONSERVER_PORT));
-            server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
-              HBaseRPCProtocolVersion.versionID, addr, this.conf,
-              params.getRpcTimeout(), options);
-          }
-        }
-      } catch (RemoteException e) {
-        throw RemoteExceptionHandler.decodeRemoteException(e);
-      }
-
-      return server;
-    }
-
-    public HRegionInterface getHRegionConnection(HServerAddress regionServer,
-        HBaseRPCOptions options) throws IOException {
-      return getHRegionConnection(regionServer, false, options);
-    }
-
-    public HRegionInterface getHRegionConnection(HServerAddress regionServer,
-        boolean getMaster) throws IOException {
-      return getHRegionConnection(regionServer, getMaster,
-          HBaseRPCOptions.DEFAULT);
-    }
-
-    public HRegionInterface getHRegionConnection(HServerAddress regionServer)
-        throws IOException {
-      return getHRegionConnection(regionServer, false);
-    }
-
-    public synchronized ZooKeeperWrapper getZooKeeperWrapper()
-        throws IOException {
-      return HConnectionManager.getClientZKConnection(conf)
-          .getZooKeeperWrapper();
-    }
-
-    /*
-     * Repeatedly try to find the root region in ZK
-     *
-     * @return HRegionLocation for root region if found
-     *
-     * @throws NoServerForRegionException - if the root region can not be
-     * located after retrying
-     *
-     * @throws IOException
-     */
-    private HRegionLocation locateRootRegion() throws IOException {
-
-      // We lazily instantiate the ZooKeeper object because we don't want to
-      // make the constructor have to throw IOException or handle it itself.
-      ZooKeeperWrapper zk = getZooKeeperWrapper();
-
-      HServerAddress rootRegionAddress = null;
-      for (int tries = 0; tries < params.getNumRetries(); tries++) {
-        int localTimeouts = 0;
-        // ask the master which server has the root region
-        while (rootRegionAddress == null
-            && localTimeouts < params.getNumRetries()) {
-          // Don't read root region until we're out of safe mode so we know
-          // that the meta regions have been assigned.
-          rootRegionAddress = zk.readRootRegionLocation();
-          if (rootRegionAddress == null) {
-            try {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Sleeping " + params.getPauseTime(tries)
-                    + "ms, waiting for root region.");
-              }
-              Thread.sleep(params.getPauseTime(tries));
-            } catch (InterruptedException iex) {
-              // continue
-            }
-            localTimeouts++;
-          }
-        }
-
-        if (rootRegionAddress == null) {
-          throw new NoServerForRegionException(
-              "Timed out trying to locate root region");
-        }
-
-        LOG.debug("Trying to get root region at : " + rootRegionAddress);
-        try {
-          // Get a connection to the region server
-          HRegionInterface server = getHRegionConnection(rootRegionAddress);
-          // if this works, then we're good, and we have an acceptable address,
-          // so we can stop doing retries and return the result.
-          server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found ROOT at " + rootRegionAddress);
-          }
-          break;
-        } catch (Throwable t) {
-          t = translateException(t);
-
-          if (tries == params.getNumRetries() - 1) {
-            throw new NoServerForRegionException("Timed out trying to locate "+
-                "root region because: " + t.getMessage());
-          }
-
-          // Sleep and retry finding root region.
-          try {
-            LOG.debug("Root region location changed. Sleeping.", t);
-            Thread.sleep(params.getPauseTime(tries));
-            LOG.debug("Wake. Retry finding root region.");
-          } catch (InterruptedException iex) {
-            // continue
-          }
-        }
-
-        rootRegionAddress = null;
-      }
-
-      // if the address is null by this point, then the retries have failed,
-      // and we're sort of sunk
-      if (rootRegionAddress == null) {
-        throw new NoServerForRegionException(
-            "unable to locate root region server");
-      }
-
-      // return the region location
-      return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
-    }
-
-    @Override
-    public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
-        throws IOException {
-      List<Throwable> exceptions = new ArrayList<Throwable>();
-      RegionOverloadedException roe = null;
-
-      long callStartTime;
-      int serverRequestedRetries = 0;
-
-      callStartTime = System.currentTimeMillis();
-      long serverRequestedWaitTime = 0;
-      // do not retry if region cannot be located. There are enough retries
-      // within instantiateRegionLocation.
-      callable.instantiateRegionLocation(false /* reload cache? */);
-
-      for (int tries = 0;; tries++) {
-        // If server requested wait. We will wait for that time, and start
-        // again. Do not count this time/tries against the client retries.
-        if (serverRequestedWaitTime > 0) {
-          serverRequestedRetries++;
-
-          if (serverRequestedRetries > params.getMaxServerRequestedRetries()) {
-            throw RegionOverloadedException.create(roe, exceptions,
-                serverRequestedWaitTime);
-          }
-
-          long pauseTime = serverRequestedWaitTime + callStartTime
-              - System.currentTimeMillis();
-          LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for "
-              + pauseTime + "ms. serverRequestedRetries = "
-              + serverRequestedRetries);
-          try {
-            Thread.sleep(pauseTime);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new InterruptedIOException();
-          }
-
-          serverRequestedWaitTime = 0;
-          tries = 0;
-          callStartTime = System.currentTimeMillis();
-        }
-
-        try {
-          return getRegionServerWithoutRetries(callable, false);
-        } catch (DoNotRetryIOException ioe) {
-          // clear cache if needed
-          if (ioe.getCause() instanceof NotServingRegionException) {
-            HRegionLocation prevLoc = callable.location;
-            if (prevLoc.getRegionInfo() != null) {
-              deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
-                  .getStartKey(), prevLoc.getServerAddress());
-            }
-          }
-
-          // If we are not supposed to retry; Let it pass through.
-          throw ioe;
-        } catch (RegionOverloadedException ex) {
-          roe = ex;
-          serverRequestedWaitTime = roe.getBackoffTimeMillis();
-          continue;
-        } catch (ClientSideDoNotRetryException exp) {
-          // Bail out of the retry loop, immediately
-          throw exp;
-        } catch (PreemptiveFastFailException pfe) {
-          // Bail out of the retry loop, if the host has been consistently
-          // unreachable.
-          throw pfe;
-        } catch (Throwable t) {
-          exceptions.add(t);
-
-          if (tries == params.getNumRetries() - 1) {
-            throw new RetriesExhaustedException(callable.getServerName(),
-                callable.getRegionName(), callable.getRow(), tries, exceptions);
-          }
-
-          HRegionLocation prevLoc = callable.location;
-          if (prevLoc.getRegionInfo() != null) {
-            deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
-                .getStartKey(), prevLoc.getServerAddress());
-          }
-
-          try {
-            // do not retry if getting the location throws exception
-            callable.instantiateRegionLocation(false /* reload cache ? */);
-          } catch (IOException e) {
-            exceptions.add(e);
-            throw new RetriesExhaustedException(callable.getServerName(),
-                callable.getRegionName(), callable.getRow(), tries, exceptions);
-          }
-          if (prevLoc.getServerAddress().equals(
-              callable.location.getServerAddress())) {
-            // Bail out of the retry loop if we have to wait too long
-            long pauseTime = params.getPauseTime(tries);
-            if ((System.currentTimeMillis() - callStartTime + pauseTime) > params
-                .getRpcRetryTimeout()) {
-              throw new RetriesExhaustedException(callable.getServerName(),
-                  callable.getRegionName(), callable.getRow(), tries,
-                  exceptions);
-            }
-            LOG.debug("getRegionServerWithRetries failed, sleeping for "
-                + pauseTime + "ms. tries = " + tries, t);
-            try {
-              Thread.sleep(pauseTime);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new InterruptedIOException();
-            }
-            // do not reload cache. While we were sleeping hopefully the cache
-            // has been re-populated.
-            callable.instantiateRegionLocation(false);
-          } else {
-            LOG.debug("getRegionServerWithRetries failed, "
-                + "region moved from " + prevLoc + " to " + callable.location
-                + "retrying immediately tries=" + tries, t);
-          }
-        }
-      }
-    }
-
-    /**
-     * Pass in a ServerCallable with your particular bit of logic defined and
-     * this method will pass it to the defined region server.
-     *
-     * @param <T>
-     *          the type of the return value
-     * @param callable
-     *          callable to run
-     * @return an object of type T
-     * @throws IOException
-     *           if a remote or network exception occurs
-     * @throws RuntimeException
-     *           other unspecified error
-     * @throws PreemptiveFastFailException
-     *           if the remote host has been known to be unreachable for more
-     *           than this.fastFailThresholdMilliSec.
-     */
-    public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
-        boolean instantiateRegionLocation) throws IOException,
-        RuntimeException, PreemptiveFastFailException {
-      FailureInfo fInfo = null;
-      HServerAddress server = null;
-      boolean didTry = false;
-      MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
-      boolean retryDespiteFastFailMode = false;
-      try {
-        if (instantiateRegionLocation) {
-          callable.instantiateRegionLocation(false);
-        }
-        // Logic to fast fail requests to unreachable servers.
-        server = callable.getServerAddress();
-        fInfo = repeatedFailuresMap.get(server);
-
-        if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
-          // In Fast-fail mode, all but one thread will fast fail. Check
-          // if we are that one chosen thread.
-          retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-          if (retryDespiteFastFailMode == false) { // we don't have to retry
-            throw new PreemptiveFastFailException(
-                fInfo.numConsecutiveFailures.get(),
-                fInfo.timeOfFirstFailureMilliSec,
-                fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
-          }
-        }
-        didTry = true;
-        callable.instantiateServer();
-        return callable.call();
-      } catch (PreemptiveFastFailException pfe) {
-        throw pfe;
-      } catch (ClientSideDoNotRetryException exp) {
-        throw exp;
-      } catch (Throwable t1) {
-        handleThrowable(t1, callable, couldNotCommunicateWithServer);
-        return null;
-      } finally {
-        updateFailureInfoForServer(server, fInfo, didTry,
-            couldNotCommunicateWithServer.booleanValue(),
-            retryDespiteFastFailMode);
-      }
-    }
-
-    @Override
-    public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
-        throws IOException, RuntimeException, PreemptiveFastFailException {
-      return this.getRegionServerWithoutRetries(callable, true);
-    }
-
-    private <T> void updateClientContext(final ServerCallable<T> callable,
-        final Throwable t) {
-      if (!recordClientContext) {
-        return;
-      }
-
-      List<OperationContext> currContext = this.operationContextPerThread.get();
-      if (currContext == null) {
-        currContext = new ArrayList<OperationContext>();
-        this.operationContextPerThread.set(currContext);
-      }
-
-      currContext.add(new OperationContext(callable.location, t));
-    }
-
-    /**
-     * Handles failures encountered when communicating with a server.
-     *
-     * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
-     * Throws RepeatedConnectException if the client is in Fast fail mode.
-     *
-     * @param server
-     * @param t
-     *          - the throwable to be handled.
-     * @throws PreemptiveFastFailException
-     */
-    private void handleFailureToServer(HServerAddress server, Throwable t) {
-      if (server == null || t == null)
-        return;
-
-      long currentTime = System.currentTimeMillis();
-      FailureInfo fInfo = repeatedFailuresMap.get(server);
-      if (fInfo == null) {
-        fInfo = new FailureInfo(currentTime);
-        FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
-
-        if (oldfInfo != null) {
-          fInfo = oldfInfo;
-        }
-      }
-      fInfo.timeOfLatestAttemptMilliSec = currentTime;
-      fInfo.numConsecutiveFailures.incrementAndGet();
-
-      if (inFastFailMode(server)) {
-        // In FastFail mode, do not clear out the cache if it was done recently.
-        if (currentTime > fInfo.timeOfLatestCacheClearMilliSec
-            + cacheClearingTimeoutMilliSec) {
-          fInfo.timeOfLatestCacheClearMilliSec = currentTime;
-          clearCachedLocationForServer(server.toString());
-        }
-        LOG.error("Exception in FastFail mode : " + t.toString());
-        return;
-      }
-
-      // if thrown these exceptions, we clear all the cache entries that
-      // map to that slow/dead server; otherwise, let cache miss and ask
-      // .META. again to find the new location
-      fInfo.timeOfLatestCacheClearMilliSec = currentTime;
-      clearCachedLocationForServer(server.toString());
-    }
-
-    /**
-     * Occasionally cleans up unused information in repeatedFailuresMap.
-     *
-     * repeatedFailuresMap stores the failure information for all remote hosts
-     * that had failures. In order to avoid these from growing indefinitely,
-     * occassionallyCleanupFailureInformation() will clear these up once every
-     * cleanupInterval ms.
-     */
-    private void occasionallyCleanupFailureInformation() {
-      long now = System.currentTimeMillis();
-      if (!(now > lastFailureMapCleanupTimeMilliSec
-          + failureMapCleanupIntervalMilliSec))
-        return;
-
-      // remove entries that haven't been attempted in a while
-      // No synchronization needed. It is okay if multiple threads try to
-      // remove the entry again and again from a concurrent hash map.
-      StringBuilder sb = new StringBuilder();
-      for (Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap
-          .entrySet()) {
-        if (now > entry.getValue().timeOfLatestAttemptMilliSec
-            + failureMapCleanupIntervalMilliSec) { // no recent failures
-          repeatedFailuresMap.remove(entry.getKey());
-        } else if (now > entry.getValue().timeOfFirstFailureMilliSec
-            + this.fastFailClearingTimeMilliSec) { // been failing for a long
-                                                   // time
-          LOG.error(entry.getKey()
-              + " been failing for a long time. clearing out."
-              + entry.getValue().toString());
-          repeatedFailuresMap.remove(entry.getKey());
-        } else {
-          sb.append(entry.getKey().toString() + " failing "
-              + entry.getValue().toString() + "\n");
-        }
-      }
-      if (sb.length() > 0
-      // If there are multiple threads cleaning up, try to see that only one
-      // will log the msg.
-          && now > this.lastFailureMapCleanupTimeMilliSec
-              + this.failureMapCleanupIntervalMilliSec) {
-        LOG.warn("Preemptive failure enabled for : " + sb.toString());
-      }
-      lastFailureMapCleanupTimeMilliSec = now;
-    }
-
-    /**
-     * Checks to see if we are in the Fast fail mode for requests to the server.
-     *
-     * If a client is unable to contact a server for more than
-     * fastFailThresholdMilliSec the client will get into fast fail mode.
-     *
-     * @param server
-     * @return true if the client is in fast fail mode for the server.
-     */
-    private boolean inFastFailMode(HServerAddress server) {
-      FailureInfo fInfo = repeatedFailuresMap.get(server);
-      // if fInfo is null --> The server is considered good.
-      // If the server is bad, wait long enough to believe that the server is
-      // down.
-      return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec
-          + this.fastFailThresholdMilliSec);
-    }
-
-    /**
-     * Checks to see if the current thread is already in FastFail mode for
-     * *some* server.
-     *
-     * @return true, if the thread is already in FF mode.
-     */
-    private boolean currentThreadInFastFailMode() {
-      return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode
-          .get().booleanValue() == true);
-    }
-
-    /**
-     * Check to see if the client should try to connnect to the server, inspite
-     * of knowing that it is in the fast fail mode.
-     *
-     * The idea here is that we want just one client thread to be actively
-     * trying to reconnect, while all the other threads trying to reach the
-     * server will short circuit.
-     *
-     * @param fInfo
-     * @return true if the client should try to connect to the server.
-     */
-    private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
-      // We believe that the server is down, But, we want to have just one
-      // client
-      // actively trying to connect. If we are the chosen one, we will retry
-      // and not throw an exception.
-      if (fInfo != null
-          && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false,
-              true)) {
-        MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
-            .get();
-        if (threadAlreadyInFF == null) {
-          threadAlreadyInFF = new MutableBoolean();
-          this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
-        }
-        threadAlreadyInFF.setValue(true);
-
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    /**
-     * updates the failure information for the server.
-     *
-     * @param server
-     * @param fInfo
-     * @param couldNotCommunicate
-     * @param retryDespiteFastFailMode
-     */
-    private void updateFailureInfoForServer(HServerAddress server,
-        FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
-        boolean retryDespiteFastFailMode) {
-      if (server == null || fInfo == null || didTry == false)
-        return;
-
-      // If we were able to connect to the server, reset the failure
-      // information.
-      if (couldNotCommunicate == false) {
-        LOG.info("Clearing out PFFE for server " + server.getHostname());
-        repeatedFailuresMap.remove(server);
-      } else {
-        // update time of last attempt
-        long currentTime = System.currentTimeMillis();
-        fInfo.timeOfLatestAttemptMilliSec = currentTime;
-
-        // Release the lock if we were retrying inspite of FastFail
-        if (retryDespiteFastFailMode) {
-          fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
-          threadRetryingInFastFailMode.get().setValue(false);
-        }
-      }
-
-      occasionallyCleanupFailureInformation();
-    }
-
-    public void updateFailureInfoForServer(HServerAddress server,
-        boolean didTry, boolean couldNotCommunicate) {
-      FailureInfo fInfo = repeatedFailuresMap.get(server);
-      boolean retryDespiteFastFailMode = false;
-      if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
-        // In Fast-fail mode, all but one thread will fast fail. Check
-        // if we are that one chosen thread.
-        retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-      }
-
-      updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode);
-    }
-
-    public void handleThrowable(Throwable t1, ServerCallable<?> callable,
-                                MutableBoolean couldNotCommunicateWithServer)
-        throws IOException {
-      Throwable t2 = translateException(t1);
-      boolean isLocalException = !(t2 instanceof RemoteException);
-      // translateException throws DoNotRetryException or any
-      // non-IOException.
-      if (isLocalException && isNetworkException(t2)) {
-        couldNotCommunicateWithServer.setValue(true);
-        handleFailureToServer(callable.getServerAddress(), t2);
-      }
-
-      updateClientContext(callable, t2);
-      if (t2 instanceof IOException) {
-        throw (IOException) t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    }
-
-    private Callable<Long> createGetServerStartCodeCallable(
-        final HServerAddress address, final HBaseRPCOptions options) {
-      final HConnection connection = this;
-      return new Callable<Long>() {
-        public Long call() throws IOException {
-          return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
-              connection, address, options) {
-            public Long call() throws IOException {
-              return server.getStartCode();
-            }
-          });
-        }
-      };
-    }
-
-    private Callable<Long> createCurrentTimeCallable(
-        final HServerAddress address, final HBaseRPCOptions options) {
-      final HConnection connection = this;
-      return new Callable<Long>() {
-        public Long call() throws IOException {
-          return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
-              connection, address, options) {
-            public Long call() throws IOException {
-              return server.getCurrentTimeMillis();
-            }
-          });
-        }
-      };
-    }
-
-    private Callable<MapWritable> createGetLastFlushTimesCallable(
-        final HServerAddress address, final HBaseRPCOptions options) {
-      final HConnection connection = this;
-      return new Callable<MapWritable>() {
-        public MapWritable call() throws IOException {
-          return getRegionServerWithoutRetries(new ServerCallableForBatchOps<MapWritable>(
-              connection, address, options) {
-            public MapWritable call() throws IOException {
-              return server.getLastFlushTimes();
-            }
-          });
-        }
-      };
-    }
-
-    private Callable<Void> createFlushCallable(final HServerAddress address,
-        final HRegionInfo region, final long targetFlushTime,
-        final HBaseRPCOptions options) {
-      final HConnection connection = this;
-      return new Callable<Void>() {
-        public Void call() throws IOException {
-          return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Void>(
-              connection, address, options) {
-            public Void call() throws IOException {
-              server.flushRegion(region.getRegionName(), targetFlushTime);
-              return null;
-            }
-          });
-        }
-      };
-    }
-
-    private Callable<MultiResponse> createMultiActionCallable(
-        final HServerAddress address, final MultiAction multi,
-        final byte[] tableName, final HBaseRPCOptions options) {
-      final HConnection connection = this;
-      // no need to track mutations here. Done at the caller.
-      return new Callable<MultiResponse>() {
-        public MultiResponse call() throws IOException {
-          return getRegionServerWithoutRetries(
-              new ServerCallableForBatchOps<MultiResponse>(connection, address,
-                  options) {
-                public MultiResponse call() throws IOException,
-                    InterruptedException, ExecutionException {
-                  return server.multiAction(multi);
-                }
-              }, true);
-        }
-      };
-    }
-
-    private HRegionLocation getRegionLocationForRowWithRetries(
-        byte[] tableName, byte[] rowKey, boolean reload) throws IOException {
-      boolean reloadFlag = reload;
-      List<Throwable> exceptions = new ArrayList<Throwable>();
-      HRegionLocation location = null;
-      int tries = 0;
-      for (; tries < params.getNumRetries();) {
-        try {
-          location = getRegionLocation(tableName, rowKey, reloadFlag);
-        } catch (Throwable t) {
-          exceptions.add(t);
-        }
-        if (location != null) {
-          break;
-        }
-        reloadFlag = true;
-        tries++;
-        try {
-          Thread.sleep(params.getPauseTime(tries));
-        } catch (InterruptedException e) {
-          // continue
-        }
-      }
-      if (location == null) {
-        throw new RetriesExhaustedException(
-            " -- nothing found, no 'location' returned," + " tableName="
-                + Bytes.toString(tableName) + ", reload=" + reload + " --",
-            HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions);
-      }
-      return location;
-    }
-
-    private <R extends Row> Map<HServerAddress, MultiAction> splitRequestsByRegionServer(
-        List<R> workingList, final byte[] tableName, boolean isGets)
-        throws IOException {
-      Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
-      for (int i = 0; i < workingList.size(); i++) {
-        Row row = workingList.get(i);
-        if (row != null) {
-          HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
-
-          byte[] regionName = loc.getRegionInfo().getRegionName();
-
-          MultiAction actions = actionsByServer.get(loc.getServerAddress());
-          if (actions == null) {
-            actions = new MultiAction();
-            actionsByServer.put(loc.getServerAddress(), actions);
-          }
-
-          if (isGets) {
-            actions.addGet(regionName, (Get) row, i);
-          } else {
-            trackMutationsToTable(tableName, loc);
-            actions.mutate(regionName, (Mutation) row);
-          }
-        }
-      }
-      return actionsByServer;
-    }
-
-    private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
-        Map<HServerAddress, MultiAction> actionsByServer,
-        final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
-
-      Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress, Future<MultiResponse>>(
-          actionsByServer.size());
-
-      boolean singleServer = (actionsByServer.size() == 1);
-      for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
-        Callable<MultiResponse> callable = createMultiActionCallable(
-            e.getKey(), e.getValue(), tableName, options);
-        Future<MultiResponse> task;
-        if (singleServer) {
-          task = new FutureTask<MultiResponse>(callable);
-          ((FutureTask<MultiResponse>) task).run();
-        } else {
-          task = pool.submit(callable);
-        }
-        futures.put(e.getKey(), task);
-      }
-      return futures;
-    }
-
-    /*
-     * Collects responses from each of the RegionServers. If there are failures,
-     * a list of failed operations is returned.
-     */
-    private List<Mutation> collectResponsesForMutateFromAllRS(byte[] tableName,
-        Map<HServerAddress, MultiAction> actionsByServer,
-        Map<HServerAddress, Future<MultiResponse>> futures,
-        Map<String, HRegionFailureInfo> failureInfo)
-        throws InterruptedException, IOException {
-
-      List<Mutation> newWorkingList = null;
-      for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
-          .entrySet()) {
-        HServerAddress address = responsePerServer.getKey();
-        MultiAction request = actionsByServer.get(address);
-
-        Future<MultiResponse> future = responsePerServer.getValue();
-        MultiResponse resp = null;
-
-        try {
-          resp = future.get();
-        } catch (InterruptedException ie) {
-          throw ie;
-        } catch (ExecutionException e) {
-          if (e.getCause() instanceof DoNotRetryIOException)
-            throw (DoNotRetryIOException) e.getCause();
-        }
-
-        // If we got a response. Let us go through the responses from each
-        // region and
-        // process the Puts and Deletes.
-        // If the response is null, we will add it to newWorkingList here.
-        if (request.getDeletes() != null) {
-          newWorkingList = processMutationResponseFromOneRegionServer(
-              tableName, address, resp, request.getDeletes(), newWorkingList,
-              true, failureInfo);
-        }
-        if (request.getPuts() != null) {
-          newWorkingList = processMutationResponseFromOneRegionServer(
-              tableName, address, resp, request.getPuts(), newWorkingList,
-              false, failureInfo);
-        }
-      }
-      return newWorkingList;
-    }
-
-    /*
-     * Collects responses from each of the RegionServers and populates the
-     * result array. If there are failures, a list of failed operations is
-     * returned.
-     */
-    private List<Get> collectResponsesForGetFromAllRS(byte[] tableName,
-        Map<HServerAddress, MultiAction> actionsByServer,
-        Map<HServerAddress, Future<MultiResponse>> futures,
-        List<Get> orig_list, Result[] results,
-        Map<String, HRegionFailureInfo> failureInfo) throws IOException,
-        InterruptedException {
-
-      List<Get> newWorkingList = null;
-      for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
-          .entrySet()) {
-        HServerAddress address = responsePerServer.getKey();
-        MultiAction request = actionsByServer.get(address);
-
-        Future<MultiResponse> future = responsePerServer.getValue();
-        MultiResponse resp = null;
-
-        try {
-          resp = future.get();
-        } catch (InterruptedException ie) {
-          throw ie;
-        } catch (ExecutionException e) {
-          if (e.getCause() instanceof DoNotRetryIOException) {
-            throw (DoNotRetryIOException) e.getCause();
-          }
-          e.printStackTrace();
-        }
-
-        if (resp == null) {
-          // Entire server failed
-          LOG.debug("Failed all for server: " + address
-              + ", removing from cache");
-        }
-
-        newWorkingList = processGetResponseFromOneRegionServer(tableName,
-            address, request, resp, orig_list, newWorkingList, results,
-            failureInfo);
-      }
-      return newWorkingList;
-    }
-
-    private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
-        byte[] tableName, HServerAddress address, MultiResponse resp,
-        Map<byte[], List<R>> map, List<Mutation> newWorkingList,
-        boolean isDelete, Map<String, HRegionFailureInfo> failureInfo)
-        throws IOException {
-      // If we got a response. Let us go through the responses from each region
-      // and
-      // process the Puts and Deletes.
-      for (Map.Entry<byte[], List<R>> e : map.entrySet()) {
-        byte[] regionName = e.getKey();
-        List<R> regionOps = map.get(regionName);
-
-        long result = 0;
-        try {
-          if (isDelete)
-            result = resp.getDeleteResult(regionName);
-          else
-            result = resp.getPutResult(regionName);
-
-          if (result != -1) {
-            if (newWorkingList == null)
-              newWorkingList = new ArrayList<Mutation>();
-
-            newWorkingList.addAll(regionOps.subList((int) result,
-                regionOps.size()));
-          }
-        } catch (Exception ex) {
-          String serverName = address.getHostname();
-          String regName = Bytes.toStringBinary(regionName);
-          // If response is null, we will catch a NPE here.
-          translateException(ex);
-
-          if (!failureInfo.containsKey(regName)) {
-            failureInfo.put(regName, new HRegionFailureInfo(regName));
-          }
-          failureInfo.get(regName).addException(ex);
-          failureInfo.get(regName).setServerName(serverName);
-
-          if (newWorkingList == null)
-            newWorkingList = new ArrayList<Mutation>();
-
-          newWorkingList.addAll(regionOps);
-          // enough to remove from cache one of the rows from the region
-          deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
-        }
-      }
-
-      return newWorkingList;
-    }
-
-    private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
-        HServerAddress address, MultiAction request, MultiResponse resp,
-        List<Get> orig_list, List<Get> newWorkingList, Result[] results,
-        Map<String, HRegionFailureInfo> failureInfo) throws IOException {
-
-      for (Map.Entry<byte[], List<Get>> e : request.getGets().entrySet()) {
-        byte[] regionName = e.getKey();
-        List<Get> regionGets = request.getGets().get(regionName);
-        List<Integer> origIndices = request.getOriginalIndex().get(regionName);
-
-        Result[] getResult;
-        try {
-          getResult = resp.getGetResult(regionName);
-
-          // fill up the result[] array accordingly
-          assert (origIndices.size() == getResult.length);

[... 1271 lines stripped ...]