You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:09 UTC

svn commit: r1584162 [4/5] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1584162&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java Wed Apr  2 20:49:09 2014
@@ -0,0 +1,3512 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.SyncFailedException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HConnectionParams;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.base.Preconditions;
+
+/* Encapsulates finding the servers for an HBase instance */
+public class TableServers implements ServerConnection {
+  static final Log LOG = LogFactory.getLog(TableServers.class);
+  private final Class<? extends HRegionInterface> serverInterfaceClass;
+  private final int prefetchRegionLimit;
+
+  private final Object masterLock = new Object();
+  private volatile boolean closed;
+  private volatile HMasterInterface master;
+  private volatile boolean masterChecked;
+
+  private final Object rootRegionLock = new Object();
+  private final Object metaRegionLock = new Object();
+  private final Object userRegionLock = new Object();
+
+  private volatile Configuration conf;
+  private final HConnectionParams params;
+
+  // Used by master and region servers during safe mode only
+  private volatile HRegionLocation rootRegionLocation;
+
+  private final Map<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>> cachedRegionLocations = new ConcurrentHashMap<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>>();
+
+  // amount of time to wait before we consider a server to be in fast fail
+  // mode
+  protected long fastFailThresholdMilliSec;
+  // Keeps track of failures when we cannot talk to a server. Helps in
+  // fast failing clients if the server is down for a long time.
+  protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<HServerAddress, FailureInfo>();
+  // We populate repeatedFailuresMap every time there is a failure. So, to
+  // keep it
+  // from growing unbounded, we garbage collect the failure information
+  // every cleanupInterval.
+  protected final long failureMapCleanupIntervalMilliSec;
+  protected volatile long lastFailureMapCleanupTimeMilliSec;
+  // Amount of time that has to pass, before we clear region -> regionserver
+  // cache
+  // again, when in fast fail mode. This is used to clean unused entries.
+  protected long cacheClearingTimeoutMilliSec;
+  // clear failure Info. Used to clean out all entries.
+  // A safety valve, in case the client does not exit the
+  // fast fail mode for any reason.
+  private long fastFailClearingTimeMilliSec;
+  private final boolean recordClientContext;
+
+  private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
+
+  @Override
+  public void resetOperationContext() {
+    if (!recordClientContext || this.operationContextPerThread == null) {
+      return;
+    }
+
+    List<OperationContext> currContext = this.operationContextPerThread.get();
+
+    if (currContext != null) {
+      currContext.clear();
+    }
+  }
+
+  @Override
+  public List<OperationContext> getAndResetOperationContext() {
+    if (!recordClientContext || this.operationContextPerThread == null) {
+      return null;
+    }
+
+    List<OperationContext> currContext = this.operationContextPerThread.get();
+
+    if (currContext == null) {
+      return null;
+    }
+
+    ArrayList<OperationContext> context = new ArrayList<OperationContext>(
+        currContext);
+
+    // Made a copy, clear the context
+    currContext.clear();
+
+    return context;
+  }
+
+  /**
+   * Keeps track of repeated failures to any region server.
+   *
+   * @author amitanand.s
+   *
+   */
+  protected class FailureInfo {
+    // The number of consecutive failures.
+    private final AtomicLong numConsecutiveFailures = new AtomicLong();
+    // The time when the server started to become unresponsive
+    // Once set, this would never be updated.
+    private long timeOfFirstFailureMilliSec;
+    // The time when the client last tried to contact the server.
+    // This is only updated by one client at a time
+    private volatile long timeOfLatestAttemptMilliSec;
+    // The time when the client last cleared cache for regions assigned
+    // to the server. Used to ensure we don't clearCache too often.
+    private volatile long timeOfLatestCacheClearMilliSec;
+    // Used to keep track of concurrent attempts to contact the server.
+    // In Fast fail mode, we want just one client thread to try to connect
+    // the rest of the client threads will fail fast.
+    private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
+        false);
+
+    @Override
+    public String toString() {
+      return "FailureInfo: numConsecutiveFailures = "
+          + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
+          + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
+          + timeOfLatestAttemptMilliSec
+          + " timeOfLatestCacheClearMilliSec = "
+          + timeOfLatestCacheClearMilliSec
+          + " exclusivelyRetringInspiteOfFastFail  = "
+          + exclusivelyRetringInspiteOfFastFail.get();
+    }
+
+    FailureInfo(long firstFailureTime) {
+      this.timeOfFirstFailureMilliSec = firstFailureTime;
+    }
+  }
+
+  private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<MutableBoolean>();
+
+  // For TESTING purposes only;
+  public Map<HServerAddress, FailureInfo> getFailureMap() {
+    return repeatedFailuresMap;
+  }
+
+  // The presence of a server in the map implies it's likely that there is an
+  // entry in cachedRegionLocations that map to this server; but the absence
+  // of a server in this map guarantees that there is no entry in cache that
+  // maps to the absent server.
+  private final Set<String> cachedServers =
+      new HashSet<String>();
+
+  // region cache prefetch is enabled by default. this set contains all
+  // tables whose region cache prefetch are disabled.
+  private final Set<Integer> regionCachePrefetchDisabledTables =
+    new CopyOnWriteArraySet<Integer>();
+  // keep track of servers that have been updated for batchedLoad
+  // tablename -> Map
+  Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
+  private int batchedUploadSoftFlushRetries;
+  private long batchedUploadSoftFlushTimeoutMillis;
+  private final boolean useThrift;
+
+  private ConcurrentSkipListSet<byte[]> initializedTableSet =
+    new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+  /**
+   * constructor
+   * @param conf Configuration object
+   */
+  @SuppressWarnings("unchecked")
+  public TableServers(Configuration conf) {
+    this.conf = conf;
+    params = HConnectionParams.getInstance(conf);
+    this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
+      HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
+
+    String serverClassName =
+      conf.get(HConstants.REGION_SERVER_CLASS,
+          HConstants.DEFAULT_REGION_SERVER_CLASS);
+
+    this.closed = false;
+
+    try {
+      this.serverInterfaceClass =
+        (Class<? extends HRegionInterface>) Class.forName(serverClassName);
+
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find region server interface " + serverClassName, e);
+    }
+
+    // TODO move parameters below into HConnectionParams
+    this.cacheClearingTimeoutMilliSec = conf.getLong(
+      "hbase.client.fastfail.cache.clear.interval",
+        10000); // 10 sec
+    this.fastFailThresholdMilliSec = conf.getLong(
+      "hbase.client.fastfail.threshold",
+        60000); // 1 min
+    this.failureMapCleanupIntervalMilliSec = conf.getLong(
+        "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
+    this.fastFailClearingTimeMilliSec = conf.getLong(
+        "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins
+
+    this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
+        10);
+
+    this.master = null;
+    this.masterChecked = false;
+    this.batchedUploadSoftFlushRetries =
+        conf.getInt("hbase.client.batched-upload.softflush.retries", 10);
+    this.batchedUploadSoftFlushTimeoutMillis =
+        conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
+    batchedUploadUpdatesMap  = new ConcurrentHashMap<String,
+        ConcurrentMap<HRegionInfo, HRegionLocation>>();
+
+    this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
+
+  }
+
+  // Used by master and region servers during safe mode only
+  @Override
+  public void unsetRootRegionLocation() {
+    this.rootRegionLocation = null;
+  }
+
+  // Used by master and region servers during safe mode only
+  @Override
+  public void setRootRegionLocation(HRegionLocation rootRegion) {
+    if (rootRegion == null) {
+      throw new IllegalArgumentException(
+          "Cannot set root region location to null.");
+    }
+    this.rootRegionLocation = rootRegion;
+  }
+
+  @Override
+  public HMasterInterface getMaster() throws MasterNotRunningException {
+    ZooKeeperWrapper zk;
+    try {
+      zk = getZooKeeperWrapper();
+    } catch (IOException e) {
+      throw new MasterNotRunningException(e);
+    }
+
+    HServerAddress masterLocation = null;
+    synchronized (this.masterLock) {
+      for (int tries = 0; !this.closed && !this.masterChecked
+          && this.master == null && tries < params.getNumRetries(); tries++) {
+
+        try {
+          masterLocation = zk.readMasterAddress(zk);
+
+          if (masterLocation != null) {
+            HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy(
+                  HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
+                  masterLocation.getInetSocketAddress(), this.conf,
+                  params.getRpcTimeout(), HBaseRPCOptions.DEFAULT);
+
+            if (tryMaster.isMasterRunning()) {
+              this.master = tryMaster;
+              this.masterLock.notifyAll();
+              break;
+            }
+          }
+
+        } catch (IOException e) {
+          if (tries == params.getNumRetries() - 1) {
+            // This was our last chance - don't bother sleeping
+            LOG.info(
+                "getMaster attempt " + tries + " of "
+                    + params.getNumRetries() + " failed; no more retrying.",
+                e);
+            break;
+          }
+          LOG.info(
+              "getMaster attempt " + tries + " of " + params.getNumRetries()
+                  + " failed; retrying after sleep of "
+                  + params.getPauseTime(tries), e);
+        }
+
+        // Cannot connect to master or it is not running. Sleep & retry
+        try {
+          this.masterLock.wait(params.getPauseTime(tries));
+        } catch (InterruptedException e) {
+          // continue
+        }
+      }
+      this.masterChecked = true;
+    }
+    if (this.master == null) {
+      if (masterLocation == null) {
+        throw new MasterNotRunningException();
+      }
+      throw new MasterNotRunningException(masterLocation.toString());
+    }
+    return this.master;
+  }
+
+  @Override
+  public boolean isMasterRunning() {
+    if (this.master == null) {
+      try {
+        getMaster();
+
+      } catch (MasterNotRunningException e) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean tableExists(final byte[] tableName)
+      throws MasterNotRunningException {
+    getMaster();
+    if (tableName == null) {
+      throw new IllegalArgumentException("Table name cannot be null");
+    }
+    if (isMetaTableName(tableName)) {
+      return true;
+    }
+    boolean exists = false;
+    try {
+      HTableDescriptor[] tables = listTables();
+      for (HTableDescriptor table : tables) {
+        if (Bytes.equals(table.getName(), tableName)) {
+          exists = true;
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Testing for table existence threw exception", e);
+    }
+    return exists;
+  }
+
+  /*
+   * @param n
+   *
+   * @return Truen if passed tablename <code>n</code> is equal to the name of
+   * a catalog table.
+   */
+  private static boolean isMetaTableName(final byte[] n) {
+    return MetaUtils.isMetaTableName(n);
+  }
+
+  @Override
+  public HRegionLocation getRegionLocation(final byte[] name,
+      final byte[] row, boolean reload) throws IOException {
+    return reload ? relocateRegion(name, row) : locateRegion(name, row);
+  }
+
+  @Override
+  public HTableDescriptor[] listTables() throws IOException {
+    getMaster();
+    final TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
+    MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
+      public boolean processRow(Result result) throws IOException {
+        try {
+          byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
+              HConstants.REGIONINFO_QUALIFIER);
+          HRegionInfo info = null;
+          if (value != null) {
+            info = Writables.getHRegionInfo(value);
+          }
+          // Only examine the rows where the startKey is zero length
+          if (info != null && info.getStartKey().length == 0) {
+            uniqueTables.add(info.getTableDesc());
+          }
+          return true;
+        } catch (RuntimeException e) {
+          LOG.error("Result=" + result);
+          throw e;
+        }
+      }
+    };
+    MetaScanner.metaScan(conf, visitor);
+
+    return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
+  }
+
+  @Override
+  public boolean isTableEnabled(byte[] tableName) throws IOException {
+    return testTableOnlineState(tableName, true);
+  }
+
+  @Override
+  public boolean isTableDisabled(byte[] tableName) throws IOException {
+    return testTableOnlineState(tableName, false);
+  }
+
+  @Override
+  public boolean isTableAvailable(final byte[] tableName) throws IOException {
+    final AtomicBoolean available = new AtomicBoolean(true);
+    MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
+      public boolean processRow(Result row) throws IOException {
+        byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
+            HConstants.REGIONINFO_QUALIFIER);
+        HRegionInfo info = Writables.getHRegionInfoOrNull(value);
+        if (info != null) {
+          if (Bytes.equals(tableName, info.getTableDesc().getName())) {
+            value = row.getValue(HConstants.CATALOG_FAMILY,
+                HConstants.SERVER_QUALIFIER);
+            if (value == null) {
+              available.set(false);
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    };
+    MetaScanner.metaScan(conf, visitor);
+    return available.get();
+  }
+
+  /*
+   * If online == true Returns true if all regions are online Returns false in
+   * any other case If online == false Returns true if all regions are offline
+   * Returns false in any other case
+   */
+  private boolean testTableOnlineState(byte[] tableName, boolean online)
+      throws IOException {
+    if (!tableExists(tableName)) {
+      throw new TableNotFoundException(Bytes.toString(tableName));
+    }
+    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+      // The root region is always enabled
+      return true;
+    }
+    int rowsScanned = 0;
+    int rowsOffline = 0;
+    byte[] startKey = HRegionInfo.createRegionName(tableName, null,
+        HConstants.ZEROES, false);
+    byte[] endKey;
+    HRegionInfo currentRegion;
+    Scan scan = new Scan(startKey);
+    scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+    int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
+    scan.setCaching(rows);
+    ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName,
+        HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME
+        : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT);
+    try {
+      // Open scanner
+      getRegionServerWithRetries(s);
+      do {
+        currentRegion = s.getHRegionInfo();
+        Result r;
+        Result[] rrs;
+        while ((rrs = getRegionServerWithRetries(s)) != null
+            && rrs.length > 0) {
+          r = rrs[0];
+          byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
+              HConstants.REGIONINFO_QUALIFIER);
+          if (value != null) {
+            HRegionInfo info = Writables.getHRegionInfoOrNull(value);
+            if (info != null) {
+              if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
+                rowsScanned += 1;
+                rowsOffline += info.isOffline() ? 1 : 0;
+              }
+            }
+          }
+        }
+        endKey = currentRegion.getEndKey();
+      } while (!(endKey == null || Bytes.equals(endKey,
+          HConstants.EMPTY_BYTE_ARRAY)));
+    } finally {
+      s.setClose();
+      // Doing below will call 'next' again and this will close the scanner
+      // Without it we leave scanners open.
+      getRegionServerWithRetries(s);
+    }
+    LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
+    boolean onOffLine = online ? rowsOffline == 0
+        : rowsOffline == rowsScanned;
+    return rowsScanned > 0 && onOffLine;
+  }
+
+  private static class HTableDescriptorFinder implements
+      MetaScanner.MetaScannerVisitor {
+    byte[] tableName;
+    HTableDescriptor result;
+
+    protected HTableDescriptorFinder(byte[] tableName) {
+      this.tableName = tableName;
+    }
+
+    @Override
+    public boolean processRow(Result rowResult) throws IOException {
+      HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue(
+          HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
+      HTableDescriptor desc = info.getTableDesc();
+      if (Bytes.compareTo(desc.getName(), tableName) == 0) {
+        result = desc;
+        return false;
+      }
+      return true;
+    }
+
+    HTableDescriptor getResult() {
+      return result;
+    }
+  }
+
+  @Override
+  public HTableDescriptor getHTableDescriptor(final byte[] tableName)
+      throws IOException {
+    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+      return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
+    }
+    if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+      return HTableDescriptor.META_TABLEDESC;
+    }
+    TableServers.HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName);
+    MetaScanner.metaScan(conf, finder, tableName);
+    HTableDescriptor result = finder.getResult();
+    if (result == null) {
+      throw new TableNotFoundException(Bytes.toString(tableName));
+    }
+    return result;
+  }
+
+  @Override
+  public HRegionLocation locateRegion(final byte[] tableName, final byte[] row)
+      throws IOException {
+    return locateRegion(tableName, row, true);
+  }
+
+  @Override
+  public HRegionLocation relocateRegion(final byte[] tableName,
+      final byte[] row) throws IOException {
+    return locateRegion(tableName, row, false);
+  }
+
+  private HRegionLocation locateRegion(final byte[] tableName,
+      final byte[] row, boolean useCache) throws IOException {
+    if (tableName == null || tableName.length == 0) {
+      throw new IllegalArgumentException(
+          "table name cannot be null or zero length");
+    }
+    if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+      return locateMetaInRoot(row, useCache, metaRegionLock);
+    } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+      synchronized (rootRegionLock) {
+        // This block guards against two threads trying to find the root
+        // region at the same time. One will go do the find while the
+        // second waits. The second thread will not do find.
+
+        if (!useCache || rootRegionLocation == null
+            || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+          this.rootRegionLocation = locateRootRegion();
+          LOG.info("Updated rootRegionLocation from ZK to : " + this.rootRegionLocation);
+        }
+        return this.rootRegionLocation;
+      }
+    } else {
+      // Region not in the cache - have to go to the meta RS
+      return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
+          useCache, userRegionLock);
+    }
+  }
+
+  private HRegionLocation prefetchRegionCache(final byte[] tableName,
+                                   final byte[] row) {
+    return prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
+  }
+
+  /*
+   * Search .META. for the HRegionLocation info that contains the table and
+   * row we're seeking. It will prefetch certain number of regions info and
+   * save them to the global region cache.
+   */
+  private HRegionLocation prefetchRegionCache(final byte[] tableName,
+      final byte[] row, int prefetchRegionLimit) {
+    // Implement a new visitor for MetaScanner, and use it to walk through
+    // the .META.
+    MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
+      public boolean processRow(Result result) throws IOException {
+        try {
+          byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
+              HConstants.REGIONINFO_QUALIFIER);
+          HRegionInfo regionInfo = null;
+
+          if (value != null) {
+            // convert the row result into the HRegionLocation we need!
+            regionInfo = Writables.getHRegionInfo(value);
+
+            // possible we got a region of a different table...
+            if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+              return false; // stop scanning
+            }
+            if (regionInfo.isOffline()) {
+              // don't cache offline regions
+              return true;
+            }
+            value = result.getValue(HConstants.CATALOG_FAMILY,
+                HConstants.SERVER_QUALIFIER);
+            if (value == null) {
+              return true; // don't cache it
+            }
+            final String serverAddress = Bytes.toString(value);
+
+            value = result.getValue(HConstants.CATALOG_FAMILY,
+                HConstants.STARTCODE_QUALIFIER);
+            long serverStartCode = -1;
+            if (value != null) {
+              serverStartCode = Bytes.toLong(value);
+            }
+
+            // instantiate the location
+            HRegionLocation loc = new HRegionLocation(regionInfo,
+                new HServerAddress(serverAddress), serverStartCode);
+            // cache this meta entry
+            cacheLocation(tableName, loc);
+          }
+          return true;
+        } catch (RuntimeException e) {
+          throw new IOException(e);
+        }
+      }
+    };
+    try {
+      // pre-fetch certain number of regions info at region cache.
+      MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit);
+      return getCachedLocation(tableName, row);
+    } catch (IOException e) {
+      LOG.warn("Encounted problems when prefetch META table: ", e);
+    }
+    return null;
+  }
+
+  /**
+    * Search the meta table (.META.) for the HRegionLocation info that
+    * contains the table and row we're seeking.
+    */
+  private HRegionLocation locateRegionInMeta(final byte [] parentTable,
+    final byte [] tableName, final byte [] row, boolean useCache,
+    Object regionLockObject)
+  throws IOException {
+    HRegionLocation location;
+    if (useCache) {
+      location = getCachedLocation(tableName, row);
+      if (location != null) {
+        return location;
+      }
+    }
+
+    // If we are supposed to be using the cache, look in the cache to see if
+    // we already have the region.
+
+    // build the key of the meta region we should be looking for.
+    // the extra 9's on the end are necessary to allow "exact" matches
+    // without knowing the precise region names.
+    byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
+      HConstants.NINES, false);
+    for (int tries = 0; true; tries++) {
+      if (tries >= params.getNumRetries()) {
+        throw new NoServerForRegionException("Unable to find region for "
+          + Bytes.toStringBinary(row) + " after " + params.getNumRetries() +
+          " tries.");
+      }
+
+      FailureInfo fInfo = null;
+      HServerAddress server = null;
+      boolean didTry = false;
+      boolean couldNotCommunicateWithServer = false;
+      boolean retryDespiteFastFailMode = false;
+      try {
+        // locate the root or meta region
+        HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
+
+        server = metaLocation.getServerAddress();
+        fInfo = repeatedFailuresMap.get(server);
+
+        // Handle the case where .META. is on an unresponsive server.
+        if (inFastFailMode(server) &&
+            !this.currentThreadInFastFailMode()) {
+          // In Fast-fail mode, all but one thread will fast fail. Check
+          // if we are that one chosen thread.
+
+          retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+          if (retryDespiteFastFailMode == false) { // we don't have to retry
+            throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+                fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+          }
+        }
+        didTry = true;
+
+        HBaseThriftRPC.isMeta.get().push(true);
+        Result regionInfoRow = null;
+        try {
+          // This block guards against two threads trying to load the meta
+          // region at the same time. The first will load the meta region and
+          // the second will use the value that the first one found.
+          synchronized (regionLockObject) {
+            // Check the cache again for a hit in case some other thread made the
+            // same query while we were waiting on the lock. If not supposed to
+            // be using the cache, delete any existing cached location so it won't
+            // interfere.
+            if (useCache) {
+              location = getCachedLocation(tableName, row);
+              if (location != null) {
+                return location;
+              }
+            } else {
+              LOG.debug("Deleting the client location cache.");
+              deleteCachedLocation(tableName, row, null);
+            }
+
+            // If the parent table is META, we may want to pre-fetch some
+            // region info into the global region cache for this table.
+            if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
+              (getRegionCachePrefetch(tableName)) )  {
+              LOG.debug("Prefetching the client location cache.");
+              location = prefetchRegionCache(tableName, row);
+              if (location != null) {
+                return location;
+              }
+            }
+
+            HRegionInterface serverInterface =
+              getHRegionConnection(metaLocation.getServerAddress());
+
+            // Query the root or meta region for the location of the meta region
+            regionInfoRow = serverInterface.getClosestRowBefore(
+              metaLocation.getRegionInfo().getRegionName(), metaKey,
+              HConstants.CATALOG_FAMILY);
+        }
+      } catch (Exception e) {
+        throw e;
+      } finally {
+        HBaseThriftRPC.isMeta.get().pop();
+      }
+      location = getLocationFromRow(regionInfoRow, tableName,
+        parentTable, row);
+      cacheLocation(tableName, location);
+      return location;
+    } catch (TableNotFoundException e) {
+      // if we got this error, probably means the table just plain doesn't
+      // exist. rethrow the error immediately. this should always be coming
+      // from the HTable constructor.
+      throw e;
+    } catch (PreemptiveFastFailException e) {
+      // already processed this. Don't process this again.
+      throw e;
+    } catch (IOException e) {
+      if (e instanceof RemoteException) {
+        e = RemoteExceptionHandler
+          .decodeRemoteException((RemoteException) e);
+      } else if (isNetworkException(e)) {
+        couldNotCommunicateWithServer = true;
+        handleFailureToServer(server, e);
+      }
+      if (tries < params.getNumRetries() - 1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("locateRegionInMeta attempt " + tries + " of "
+            + params.getNumRetries()
+            + " failed; retrying after sleep of "
+            + params.getPauseTime(tries) + " because: " + e.getMessage());
+        }
+      } else {
+        throw e;
+      }
+      // Only relocate the parent region if necessary
+      if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) {
+        relocateRegion(parentTable, row);
+      }
+    } catch (Exception e) {
+      couldNotCommunicateWithServer = true;
+      handleFailureToServer(server, e);
+      if (tries < params.getNumRetries() - 1) {
+        LOG.debug("locateRegionInMeta attempt " + tries + " of "
+          + params.getNumRetries() + " failed; retrying after sleep of "
+          + params.getPauseTime(tries) + " because: " + e.getMessage());
+      } else {
+        throw e;
+      }
+    } finally {
+      updateFailureInfoForServer(server, fInfo, didTry,
+        couldNotCommunicateWithServer, retryDespiteFastFailMode);
+    }
+    try {
+      Thread.sleep(params.getPauseTime(tries));
+    } catch (InterruptedException e) {
+      // continue
+    }
+  }
+}
+
+private HRegionLocation getLocationFromRow(Result regionInfoRow,
+                                           byte[] tableName, byte[] parentTable, byte[] row) throws IOException {
+  if (regionInfoRow == null) {
+    throw new TableNotFoundException(Bytes.toString(tableName));
+  }
+  byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+    HConstants.REGIONINFO_QUALIFIER);
+  if (value == null || value.length == 0) {
+    throw new IOException("HRegionInfo was null or empty in "
+      + Bytes.toString(parentTable) + ", row=" + regionInfoRow);
+  }
+  // convert the row result into the HRegionLocation we need!
+  HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value,
+    new HRegionInfo());
+  // possible we got a region of a different table...
+  if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+    throw new TableNotFoundException("Table '" + Bytes.toString(tableName)
+      + "' was not found.");
+  }
+  if (regionInfo.isOffline()) {
+    throw new RegionOfflineException("region offline: "
+      + regionInfo.getRegionNameAsString());
+  }
+
+  value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+    HConstants.SERVER_QUALIFIER);
+  String serverAddress = "";
+  if (value != null) {
+    serverAddress = Bytes.toString(value);
+  }
+  if (serverAddress.equals("")) {
+    throw new NoServerForRegionException("No server address listed "
+      + "in " + Bytes.toString(parentTable) + " for region "
+      + regionInfo.getRegionNameAsString() + " containing row "
+      + Bytes.toStringBinary(row));
+  }
+
+  value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+    HConstants.STARTCODE_QUALIFIER);
+  long serverStartCode = -1;
+  if (value != null) {
+    serverStartCode = Bytes.toLong(value);
+  }
+  // instantiate the location
+  HRegionLocation location = new HRegionLocation(regionInfo,
+    new HServerAddress(serverAddress), serverStartCode);
+  return location;
+}
+
+/**
+ * TODO:WARNING!!! This looks like a lot of duplicated code with
+ * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls
+ * fix this! Search the Root Table for the Meta Region. Retries a fixed
+ * number of times and throws if Region is not found.
+ */
+private HRegionLocation locateMetaInRoot(final byte[] row,
+                                         boolean useCache, Object regionLockObject) throws IOException {
+  HRegionLocation location;
+  final byte[] parentTable = HConstants.ROOT_TABLE_NAME;
+  final byte[] tableName = HConstants.META_TABLE_NAME;
+  if (useCache) {
+    location = getCachedLocation(tableName, row);
+    if (location != null) {
+      return location;
+    }
+  }
+  // If we are supposed to be using the cache, look in the cache to see if
+  // we already have the region.
+
+  // build the key of the meta region we should be looking for.
+  // the extra 9's on the end are necessary to allow "exact" matches
+  // without knowing the precise region names.
+  byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
+    HConstants.NINES, false);
+  for (int tries = 0; true; tries++) {
+    if (tries >= params.getNumRetries()) {
+      throw new NoServerForRegionException("Unable to find region for "
+        + Bytes.toStringBinary(row) + " after " + params.getNumRetries()
+        + " tries.");
+    }
+
+    TableServers.FailureInfo fInfo = null;
+    HServerAddress server = null;
+    boolean didTry = false;
+    boolean couldNotCommunicateWithServer = false;
+    boolean retryDespiteFastFailMode = false;
+    try {
+      // locate the root or meta region
+      HRegionLocation metaLocation = null;
+      if (useCache && rootRegionLocation != null
+        && !inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+        metaLocation = rootRegionLocation;
+      } else {
+        synchronized (rootRegionLock) {
+          // This block guards against two threads trying to find the root
+          // region at the same time. One will go do the find while the
+          // second waits. The second thread will not do find.
+
+          if (!useCache || rootRegionLocation == null
+            || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+            HBaseThriftRPC.isMeta.get().push(true);
+            try {
+              this.rootRegionLocation = locateRootRegion();
+            } catch (Exception e) {
+              throw e;
+            } finally {
+              HBaseThriftRPC.isMeta.get().pop();
+            }
+            LOG.info("Updated rootRegionLocation from ZK to : "
+              + this.rootRegionLocation);
+          }
+          metaLocation = this.rootRegionLocation;
+        }
+      }
+      Preconditions.checkNotNull(metaLocation);
+      server = metaLocation.getServerAddress();
+      fInfo = repeatedFailuresMap.get(server);
+
+      // Handle the case where .META. is on an unresponsive server.
+      if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) {
+        // In Fast-fail mode, all but one thread will fast fail. Check
+        // if we are that one chosen thread.
+
+        retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+        if (retryDespiteFastFailMode == false) { // we don't have to retry
+          throw new PreemptiveFastFailException(
+            fInfo.numConsecutiveFailures.get(),
+            fInfo.timeOfFirstFailureMilliSec,
+            fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+        }
+      }
+      didTry = true;
+      HBaseThriftRPC.isMeta.get().push(true);
+      Result regionInfoRow = null;
+      // This block guards against two threads trying to load the meta
+      // region at the same time. The first will load the meta region and
+      // the second will use the value that the first one found.
+      synchronized (metaRegionLock) {
+        if (useCache) {
+          location = getCachedLocation(tableName, row);
+          if (location != null) {
+            return location;
+          }
+        } else {
+          LOG.debug("Deleting the client location cache.");
+          deleteCachedLocation(tableName, row, null);
+        }
+        HRegionInterface serverInterface = getHRegionConnection(metaLocation
+          .getServerAddress());
+
+        // Query the root for the location of the meta region
+        regionInfoRow = serverInterface.getClosestRowBefore(metaLocation
+          .getRegionInfo().getRegionName(), metaKey,
+          HConstants.CATALOG_FAMILY);
+        location = getLocationFromRow(regionInfoRow, tableName,
+          parentTable, row);
+        cacheLocation(tableName, location);
+      }
+      return location;
+    } catch (TableNotFoundException e) {
+      // if we got this error, probably means the table just plain doesn't
+      // exist. rethrow the error immediately. this should always be coming
+      // from the HTable constructor.
+      throw e;
+    } catch (PreemptiveFastFailException e) {
+      // already processed this. Don't process this again.
+      throw e;
+    } catch (IOException e) {
+      if (e instanceof RemoteException) {
+        e = RemoteExceptionHandler
+          .decodeRemoteException((RemoteException) e);
+      } else if (isNetworkException(e)) {
+        couldNotCommunicateWithServer = true;
+        handleFailureToServer(server, e);
+      }
+      if (tries < params.getNumRetries() - 1) {
+        LOG.debug("IOException locateRegionInMeta attempt " + tries
+          + " of " + params.getNumRetries()
+          + " failed; retrying after sleep of "
+          + params.getPauseTime(tries) + " because: " + e.getMessage());
+      } else {
+        throw e;
+      }
+    } catch (Exception e) {
+      couldNotCommunicateWithServer = true;
+      handleFailureToServer(server, e);
+      if (tries < params.getNumRetries() - 1) {
+        LOG.debug("Exception locateRegionInMeta attempt " + tries + " of "
+          + params.getNumRetries() + " failed; retrying after sleep of "
+          + params.getPauseTime(tries) + " because: " + e.getMessage());
+      } else {
+        throw e;
+      }
+    } finally {
+      HBaseThriftRPC.isMeta.get().pop();
+      updateFailureInfoForServer(server, fInfo, didTry,
+        couldNotCommunicateWithServer, retryDespiteFastFailMode);
+    }
+    try {
+      Thread.sleep(params.getPauseTime(tries));
+    } catch (InterruptedException e) {
+      // continue
+    }
+  }
+}
+
+
+
+
+  /**
+   * Check if the exception is something that indicates that we cannot
+   * contact/communicate with the server.
+   *
+   * @param e
+   * @return
+   */
+  private boolean isNetworkException(Throwable e) {
+    // This list covers most connectivity exceptions but not all.
+    // For example, in SocketOutputStream a plain IOException is thrown
+    // at times when the channel is closed.
+    return (e instanceof SocketTimeoutException ||
+            e instanceof ConnectException ||
+            e instanceof ClosedChannelException ||
+            e instanceof SyncFailedException ||
+            e instanceof EOFException ||
+            e instanceof TTransportException);
+  }
+
+  @Override
+  public Collection<HRegionLocation> getCachedHRegionLocations(final byte [] tableName,
+                                                               boolean forceRefresh) {
+    if (forceRefresh || !initializedTableSet.contains(tableName)) {
+      prefetchRegionCache(tableName, null, Integer.MAX_VALUE);
+      initializedTableSet.add(tableName);
+    }
+
+    ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
+      getTableLocations(tableName);
+    return tableLocations.values();
+  }
+
+  /*
+   * Search the cache for a location that fits our table and row key.
+   * Return null if no suitable region is located. TODO: synchronization note
+   *
+   * <p>TODO: This method during writing consumes 15% of CPU doing lookup
+   * into the Soft Reference SortedMap.  Improve.
+   *
+   * @param tableName
+   * @param row
+   * @return Null or region location found in cache.
+   */
+  HRegionLocation getCachedLocation(final byte [] tableName,
+      final byte [] row) {
+    ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
+      getTableLocations(tableName);
+
+    // start to examine the cache. we can only do cache actions
+    // if there's something in the cache for this table.
+    if (tableLocations.isEmpty()) {
+      return null;
+    }
+
+    HRegionLocation rl = tableLocations.get(row);
+    if (rl != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row)
+            + "> in tableName " + Bytes.toString(tableName)
+            + ": location server " + rl.getServerAddress()
+            + ", location region name "
+            + rl.getRegionInfo().getRegionNameAsString());
+      }
+      return rl;
+    }
+
+    // get the matching region for the row
+    Entry<byte[], HRegionLocation> entry = tableLocations.floorEntry(row);
+    HRegionLocation possibleRegion = (entry == null) ? null : entry
+        .getValue();
+
+    // we need to examine the cached location to verify that it is
+    // a match by end key as well.
+    if (possibleRegion != null) {
+      byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
+
+      // make sure that the end key is greater than the row we're looking
+      // for, otherwise the row actually belongs in the next region, not
+      // this one. the exception case is when the endkey is
+      // HConstants.EMPTY_START_ROW, signifying that the region we're
+      // checking is actually the last region in the table.
+      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
+          || KeyValue.getRowComparator(tableName).compareRows(endKey, 0,
+              endKey.length, row, 0, row.length) > 0) {
+        return possibleRegion;
+      }
+    }
+
+    // Passed all the way through, so we got nothing - complete cache miss
+    return null;
+  }
+
+  /*
+   * Delete a cached location for the specified table name and row if it is
+   * located on the (optionally) specified old location.
+   */
+  @Override
+  public void deleteCachedLocation(final byte[] tableName, final byte[] row,
+      HServerAddress oldServer) {
+    synchronized (this.cachedRegionLocations) {
+      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
+
+      // start to examine the cache. we can only do cache actions
+      // if there's something in the cache for this table.
+      if (!tableLocations.isEmpty()) {
+        HRegionLocation rl = getCachedLocation(tableName, row);
+        if (rl != null) {
+          // If oldLocation is specified. deleteLocation only if it is the
+          // same.
+          if (oldServer != null && !oldServer.equals(rl.getServerAddress()))
+            return; // perhaps, some body else cleared and repopulated.
+
+          tableLocations.remove(rl.getRegionInfo().getStartKey());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString()
+                + " for tableName=" + Bytes.toString(tableName)
+                + " from cache " + "because of " + Bytes.toStringBinary(row));
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Delete all cached entries of a table that maps to a specific location.
+   *
+   * @param tablename
+   *
+   * @param server
+   */
+  protected void clearCachedLocationForServer(final String server) {
+    boolean deletedSomething = false;
+    synchronized (this.cachedRegionLocations) {
+      if (!cachedServers.contains(server)) {
+        return;
+      }
+      for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations
+          .values()) {
+        for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
+          if (e.getValue().getServerAddress().toString().equals(server)) {
+            tableLocations.remove(e.getKey());
+            deletedSomething = true;
+          }
+        }
+      }
+      cachedServers.remove(server);
+    }
+    if (deletedSomething && LOG.isDebugEnabled()) {
+      LOG.debug("Removed all cached region locations that map to " + server);
+    }
+  }
+
+  /*
+   * @param tableName
+   *
+   * @return Map of cached locations for passed <code>tableName</code>
+   */
+  private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
+      final byte[] tableName) {
+    // find the map of cached locations for this table
+    Integer key = Bytes.mapKey(tableName);
+    ConcurrentSkipListMap<byte[], HRegionLocation> result = this.cachedRegionLocations
+        .get(key);
+    if (result == null) {
+      synchronized (this.cachedRegionLocations) {
+        result = this.cachedRegionLocations.get(key);
+        if (result == null) {
+          // if tableLocations for this table isn't built yet, make one
+          result = new ConcurrentSkipListMap<byte[], HRegionLocation>(
+              Bytes.BYTES_COMPARATOR);
+          this.cachedRegionLocations.put(key, result);
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Allows flushing the region cache.
+   */
+  @Override
+  public void clearRegionCache() {
+    synchronized (this.cachedRegionLocations) {
+      cachedRegionLocations.clear();
+      cachedServers.clear();
+    }
+  }
+
+  /**
+   * Put a newly discovered HRegionLocation into the cache.
+   */
+  private void cacheLocation(final byte [] tableName,
+      final HRegionLocation location) {
+    byte [] startKey = location.getRegionInfo().getStartKey();
+    boolean hasNewCache;
+    synchronized (this.cachedRegionLocations) {
+      cachedServers.add(location.getServerAddress().toString());
+      hasNewCache = (getTableLocations(tableName).put(startKey, location) == null);
+    }
+    if (hasNewCache) {
+      LOG.debug("Cached location for " +
+        location.getRegionInfo().getRegionNameAsString() +
+        " is " + location.getServerAddress().toString());
+    }
+  }
+
+  @SuppressWarnings("resource")
+  @Override
+  public HRegionInterface getHRegionConnection(
+      HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
+  throws IOException {
+    if (getMaster) {
+      LOG.debug("Getting master");
+      getMaster();
+    }
+    HRegionInterface server = HRegionServer.getMainRS(regionServer);
+    if (server != null && !this.useThrift) {
+      return server;
+    }
+
+    final boolean thriftPortWrittenToMeta = conf.getBoolean(
+       HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
+       HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT);
+    final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta;
+
+    try {
+      // establish an RPC for this RS
+      // set hbase.ipc.client.connect.max.retries to retry connection
+      // attempts
+      if (this.useThrift) {
+        Class<? extends ThriftClientInterface> serverInterface =
+          ThriftHRegionInterface.class;
+        if (thriftPortWrittenToMeta) {
+          try {
+            server = (HRegionInterface) HBaseThriftRPC.getClient(
+              regionServer.getInetSocketAddress(), this.conf,
+              serverInterface, options);
+          } catch (Exception e) {
+            LOG.warn("Exception connecting to the region server on" +
+              "the thrift channel. Retrying on the HadoopRPC port", e);
+            InetSocketAddress addr = new InetSocketAddress(regionServer
+              .getInetSocketAddress().getHostName(), conf.getInt(
+              HConstants.REGIONSERVER_PORT,
+              HConstants.DEFAULT_REGIONSERVER_PORT));
+            server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+              HBaseRPCProtocolVersion.versionID, addr, this.conf,
+              params.getRpcTimeout(), options);
+          }
+        } else {
+          try {
+            InetSocketAddress addr = new InetSocketAddress(regionServer
+              .getInetSocketAddress().getHostName(), conf.getInt(
+              HConstants.REGIONSERVER_SWIFT_PORT,
+              HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT));
+            server = (HRegionInterface) HBaseThriftRPC.getClient(addr,
+              this.conf, serverInterface, options);
+          } catch (Exception e) {
+            LOG.warn("Exception connecting to the region server on" +
+              "the thrift channel. Retrying on the HadoopRPC port", e);
+            server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+              HBaseRPCProtocolVersion.versionID,
+              regionServer.getInetSocketAddress(), this.conf,
+              params.getRpcTimeout(), options);
+          }
+        }
+      } else {
+        if (hadoopPortWrittenToMeta) {
+          server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+            HBaseRPCProtocolVersion.versionID,
+            regionServer.getInetSocketAddress(), this.conf,
+            params.getRpcTimeout(), options);
+        } else {
+          // The hadoop port is no longer written to Meta (this will happen
+          // when we are reasonably confident about the thrift service, and
+          // will soon deprecate RPC). So, try to connect to the default port.
+          // TODO gauravm: Verify that this works (t2830553)
+          InetSocketAddress addr = new InetSocketAddress(regionServer
+            .getInetSocketAddress().getHostName(), conf.getInt(
+            HConstants.REGIONSERVER_PORT,
+            HConstants.DEFAULT_REGIONSERVER_PORT));
+          server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+            HBaseRPCProtocolVersion.versionID, addr, this.conf,
+            params.getRpcTimeout(), options);
+        }
+      }
+    } catch (RemoteException e) {
+      throw RemoteExceptionHandler.decodeRemoteException(e);
+    }
+
+    return server;
+  }
+
+  @Override
+  public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+      HBaseRPCOptions options) throws IOException {
+    return getHRegionConnection(regionServer, false, options);
+  }
+
+  @Override
+  public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+      boolean getMaster) throws IOException {
+    return getHRegionConnection(regionServer, getMaster,
+        HBaseRPCOptions.DEFAULT);
+  }
+
+  @Override
+  public HRegionInterface getHRegionConnection(HServerAddress regionServer)
+      throws IOException {
+    return getHRegionConnection(regionServer, false);
+  }
+
+  @Override
+  public synchronized ZooKeeperWrapper getZooKeeperWrapper()
+      throws IOException {
+    return HConnectionManager.getClientZKConnection(conf)
+        .getZooKeeperWrapper();
+  }
+
+  /*
+   * Repeatedly try to find the root region in ZK
+   *
+   * @return HRegionLocation for root region if found
+   *
+   * @throws NoServerForRegionException - if the root region can not be
+   * located after retrying
+   *
+   * @throws IOException
+   */
+  private HRegionLocation locateRootRegion() throws IOException {
+
+    // We lazily instantiate the ZooKeeper object because we don't want to
+    // make the constructor have to throw IOException or handle it itself.
+    ZooKeeperWrapper zk = getZooKeeperWrapper();
+
+    HServerAddress rootRegionAddress = null;
+    for (int tries = 0; tries < params.getNumRetries(); tries++) {
+      int localTimeouts = 0;
+      // ask the master which server has the root region
+      while (rootRegionAddress == null
+          && localTimeouts < params.getNumRetries()) {
+        // Don't read root region until we're out of safe mode so we know
+        // that the meta regions have been assigned.
+        rootRegionAddress = zk.readRootRegionLocation();
+        if (rootRegionAddress == null) {
+          try {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sleeping " + params.getPauseTime(tries)
+                  + "ms, waiting for root region.");
+            }
+            Thread.sleep(params.getPauseTime(tries));
+          } catch (InterruptedException iex) {
+            // continue
+          }
+          localTimeouts++;
+        }
+      }
+
+      if (rootRegionAddress == null) {
+        throw new NoServerForRegionException(
+            "Timed out trying to locate root region");
+      }
+
+      LOG.debug("Trying to get root region at : " + rootRegionAddress);
+      try {
+        // Get a connection to the region server
+        HRegionInterface server = getHRegionConnection(rootRegionAddress);
+        // if this works, then we're good, and we have an acceptable address,
+        // so we can stop doing retries and return the result.
+        server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found ROOT at " + rootRegionAddress);
+        }
+        break;
+      } catch (Throwable t) {
+        t = translateException(t);
+
+        if (tries == params.getNumRetries() - 1) {
+          throw new NoServerForRegionException("Timed out trying to locate "+
+              "root region because: " + t.getMessage());
+        }
+
+        // Sleep and retry finding root region.
+        try {
+          LOG.debug("Root region location changed. Sleeping.", t);
+          Thread.sleep(params.getPauseTime(tries));
+          LOG.debug("Wake. Retry finding root region.");
+        } catch (InterruptedException iex) {
+          // continue
+        }
+      }
+
+      rootRegionAddress = null;
+    }
+
+    // if the address is null by this point, then the retries have failed,
+    // and we're sort of sunk
+    if (rootRegionAddress == null) {
+      throw new NoServerForRegionException(
+          "unable to locate root region server");
+    }
+
+    // return the region location
+    return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
+  }
+
+  @Override
+  public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
+      throws IOException {
+    List<Throwable> exceptions = new ArrayList<Throwable>();
+    RegionOverloadedException roe = null;
+
+    long callStartTime;
+    int serverRequestedRetries = 0;
+
+    callStartTime = System.currentTimeMillis();
+    long serverRequestedWaitTime = 0;
+    // do not retry if region cannot be located. There are enough retries
+    // within instantiateRegionLocation.
+    callable.instantiateRegionLocation(false /* reload cache? */);
+
+    for (int tries = 0;; tries++) {
+      // If server requested wait. We will wait for that time, and start
+      // again. Do not count this time/tries against the client retries.
+      if (serverRequestedWaitTime > 0) {
+        serverRequestedRetries++;
+
+        if (serverRequestedRetries > params.getMaxServerRequestedRetries()) {
+          throw RegionOverloadedException.create(roe, exceptions,
+              serverRequestedWaitTime);
+        }
+
+        long pauseTime = serverRequestedWaitTime + callStartTime
+            - System.currentTimeMillis();
+        LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for "
+            + pauseTime + "ms. serverRequestedRetries = "
+            + serverRequestedRetries);
+        try {
+          Thread.sleep(pauseTime);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new InterruptedIOException();
+        }
+
+        serverRequestedWaitTime = 0;
+        tries = 0;
+        callStartTime = System.currentTimeMillis();
+      }
+
+      try {
+        return getRegionServerWithoutRetries(callable, false);
+      } catch (DoNotRetryIOException ioe) {
+        // clear cache if needed
+        if (ioe.getCause() instanceof NotServingRegionException) {
+          HRegionLocation prevLoc = callable.location;
+          if (prevLoc.getRegionInfo() != null) {
+            deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+                .getStartKey(), prevLoc.getServerAddress());
+          }
+        }
+
+        // If we are not supposed to retry; Let it pass through.
+        throw ioe;
+      } catch (RegionOverloadedException ex) {
+        roe = ex;
+        serverRequestedWaitTime = roe.getBackoffTimeMillis();
+        continue;
+      } catch (ClientSideDoNotRetryException exp) {
+        // Bail out of the retry loop, immediately
+        throw exp;
+      } catch (PreemptiveFastFailException pfe) {
+        // Bail out of the retry loop, if the host has been consistently
+        // unreachable.
+        throw pfe;
+      } catch (Throwable t) {
+        exceptions.add(t);
+
+        if (tries == params.getNumRetries() - 1) {
+          throw new RetriesExhaustedException(callable.getServerName(),
+              callable.getRegionName(), callable.getRow(), tries, exceptions);
+        }
+
+        HRegionLocation prevLoc = callable.location;
+        if (prevLoc.getRegionInfo() != null) {
+          deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+              .getStartKey(), prevLoc.getServerAddress());
+        }
+
+        try {
+          // do not retry if getting the location throws exception
+          callable.instantiateRegionLocation(false /* reload cache ? */);
+        } catch (IOException e) {
+          exceptions.add(e);
+          throw new RetriesExhaustedException(callable.getServerName(),
+              callable.getRegionName(), callable.getRow(), tries, exceptions);
+        }
+        if (prevLoc.getServerAddress().equals(
+            callable.location.getServerAddress())) {
+          // Bail out of the retry loop if we have to wait too long
+          long pauseTime = params.getPauseTime(tries);
+          if ((System.currentTimeMillis() - callStartTime + pauseTime) > params
+              .getRpcRetryTimeout()) {
+            throw new RetriesExhaustedException(callable.getServerName(),
+                callable.getRegionName(), callable.getRow(), tries,
+                exceptions);
+          }
+          LOG.debug("getRegionServerWithRetries failed, sleeping for "
+              + pauseTime + "ms. tries = " + tries, t);
+          try {
+            Thread.sleep(pauseTime);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+          }
+          // do not reload cache. While we were sleeping hopefully the cache
+          // has been re-populated.
+          callable.instantiateRegionLocation(false);
+        } else {
+          LOG.debug("getRegionServerWithRetries failed, "
+              + "region moved from " + prevLoc + " to " + callable.location
+              + "retrying immediately tries=" + tries, t);
+        }
+      }
+    }
+  }
+
+  /**
+   * Pass in a ServerCallable with your particular bit of logic defined and
+   * this method will pass it to the defined region server.
+   *
+   * @param <T>
+   *          the type of the return value
+   * @param callable
+   *          callable to run
+   * @return an object of type T
+   * @throws IOException
+   *           if a remote or network exception occurs
+   * @throws RuntimeException
+   *           other unspecified error
+   * @throws PreemptiveFastFailException
+   *           if the remote host has been known to be unreachable for more
+   *           than this.fastFailThresholdMilliSec.
+   */
+  @Override
+  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
+      boolean instantiateRegionLocation) throws IOException,
+      RuntimeException, PreemptiveFastFailException {
+    FailureInfo fInfo = null;
+    HServerAddress server = null;
+    boolean didTry = false;
+    MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
+    boolean retryDespiteFastFailMode = false;
+    try {
+      if (instantiateRegionLocation) {
+        callable.instantiateRegionLocation(false);
+      }
+      // Logic to fast fail requests to unreachable servers.
+      server = callable.getServerAddress();
+      fInfo = repeatedFailuresMap.get(server);
+
+      if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
+        // In Fast-fail mode, all but one thread will fast fail. Check
+        // if we are that one chosen thread.
+        retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+        if (retryDespiteFastFailMode == false) { // we don't have to retry
+          throw new PreemptiveFastFailException(
+              fInfo.numConsecutiveFailures.get(),
+              fInfo.timeOfFirstFailureMilliSec,
+              fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+        }
+      }
+      didTry = true;
+      callable.instantiateServer();
+      return callable.call();
+    } catch (PreemptiveFastFailException pfe) {
+      throw pfe;
+    } catch (ClientSideDoNotRetryException exp) {
+      throw exp;
+    } catch (Throwable t1) {
+      handleThrowable(t1, callable, couldNotCommunicateWithServer);
+      return null;
+    } finally {
+      updateFailureInfoForServer(server, fInfo, didTry,
+          couldNotCommunicateWithServer.booleanValue(),
+          retryDespiteFastFailMode);
+    }
+  }
+
+  @Override
+  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+      throws IOException, RuntimeException, PreemptiveFastFailException {
+    return this.getRegionServerWithoutRetries(callable, true);
+  }
+
+  private <T> void updateClientContext(final ServerCallable<T> callable,
+      final Throwable t) {
+    if (!recordClientContext) {
+      return;
+    }
+
+    List<OperationContext> currContext = this.operationContextPerThread.get();
+    if (currContext == null) {
+      currContext = new ArrayList<OperationContext>();
+      this.operationContextPerThread.set(currContext);
+    }
+
+    currContext.add(new OperationContext(callable.location, t));
+  }
+
+  /**
+   * Handles failures encountered when communicating with a server.
+   *
+   * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
+   * Throws RepeatedConnectException if the client is in Fast fail mode.
+   *
+   * @param server
+   * @param t
+   *          - the throwable to be handled.
+   * @throws PreemptiveFastFailException
+   */
+  private void handleFailureToServer(HServerAddress server, Throwable t) {
+    if (server == null || t == null)
+      return;
+
+    long currentTime = System.currentTimeMillis();
+    FailureInfo fInfo = repeatedFailuresMap.get(server);
+    if (fInfo == null) {
+      fInfo = new FailureInfo(currentTime);
+      FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
+
+      if (oldfInfo != null) {
+        fInfo = oldfInfo;
+      }
+    }
+    fInfo.timeOfLatestAttemptMilliSec = currentTime;
+    fInfo.numConsecutiveFailures.incrementAndGet();
+
+    if (inFastFailMode(server)) {
+      // In FastFail mode, do not clear out the cache if it was done recently.
+      if (currentTime > fInfo.timeOfLatestCacheClearMilliSec
+          + cacheClearingTimeoutMilliSec) {
+        fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+        clearCachedLocationForServer(server.toString());
+      }
+      LOG.error("Exception in FastFail mode : " + t.toString());
+      return;
+    }
+
+    // if thrown these exceptions, we clear all the cache entries that
+    // map to that slow/dead server; otherwise, let cache miss and ask
+    // .META. again to find the new location
+    fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+    clearCachedLocationForServer(server.toString());
+  }
+
+  /**
+   * Occasionally cleans up unused information in repeatedFailuresMap.
+   *
+   * repeatedFailuresMap stores the failure information for all remote hosts
+   * that had failures. In order to avoid these from growing indefinitely,
+   * occassionallyCleanupFailureInformation() will clear these up once every
+   * cleanupInterval ms.
+   */
+  private void occasionallyCleanupFailureInformation() {
+    long now = System.currentTimeMillis();
+    if (!(now > lastFailureMapCleanupTimeMilliSec
+        + failureMapCleanupIntervalMilliSec))
+      return;
+
+    // remove entries that haven't been attempted in a while
+    // No synchronization needed. It is okay if multiple threads try to
+    // remove the entry again and again from a concurrent hash map.
+    StringBuilder sb = new StringBuilder();
+    for (Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap
+        .entrySet()) {
+      if (now > entry.getValue().timeOfLatestAttemptMilliSec
+          + failureMapCleanupIntervalMilliSec) { // no recent failures
+        repeatedFailuresMap.remove(entry.getKey());
+      } else if (now > entry.getValue().timeOfFirstFailureMilliSec
+          + this.fastFailClearingTimeMilliSec) { // been failing for a long
+                                                 // time
+        LOG.error(entry.getKey()
+            + " been failing for a long time. clearing out."
+            + entry.getValue().toString());
+        repeatedFailuresMap.remove(entry.getKey());
+      } else {
+        sb.append(entry.getKey().toString() + " failing "
+            + entry.getValue().toString() + "\n");
+      }
+    }
+    if (sb.length() > 0
+    // If there are multiple threads cleaning up, try to see that only one
+    // will log the msg.
+        && now > this.lastFailureMapCleanupTimeMilliSec
+            + this.failureMapCleanupIntervalMilliSec) {
+      LOG.warn("Preemptive failure enabled for : " + sb.toString());
+    }
+    lastFailureMapCleanupTimeMilliSec = now;
+  }
+
+  /**
+   * Checks to see if we are in the Fast fail mode for requests to the server.
+   *
+   * If a client is unable to contact a server for more than
+   * fastFailThresholdMilliSec the client will get into fast fail mode.
+   *
+   * @param server
+   * @return true if the client is in fast fail mode for the server.
+   */
+  private boolean inFastFailMode(HServerAddress server) {
+    FailureInfo fInfo = repeatedFailuresMap.get(server);
+    // if fInfo is null --> The server is considered good.
+    // If the server is bad, wait long enough to believe that the server is
+    // down.
+    return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec
+        + this.fastFailThresholdMilliSec);
+  }
+
+  /**
+   * Checks to see if the current thread is already in FastFail mode for
+   * *some* server.
+   *
+   * @return true, if the thread is already in FF mode.
+   */
+  private boolean currentThreadInFastFailMode() {
+    return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode
+        .get().booleanValue() == true);
+  }
+
+  /**
+   * Check to see if the client should try to connnect to the server, inspite
+   * of knowing that it is in the fast fail mode.
+   *
+   * The idea here is that we want just one client thread to be actively
+   * trying to reconnect, while all the other threads trying to reach the
+   * server will short circuit.
+   *
+   * @param fInfo
+   * @return true if the client should try to connect to the server.
+   */
+  private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+    // We believe that the server is down, But, we want to have just one
+    // client
+    // actively trying to connect. If we are the chosen one, we will retry
+    // and not throw an exception.
+    if (fInfo != null
+        && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false,
+            true)) {
+      MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
+          .get();
+      if (threadAlreadyInFF == null) {
+        threadAlreadyInFF = new MutableBoolean();
+        this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
+      }
+      threadAlreadyInFF.setValue(true);
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * updates the failure information for the server.
+   *
+   * @param server
+   * @param fInfo
+   * @param couldNotCommunicate
+   * @param retryDespiteFastFailMode
+   */
+  private void updateFailureInfoForServer(HServerAddress server,
+      FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
+      boolean retryDespiteFastFailMode) {
+    if (server == null || fInfo == null || didTry == false)
+      return;
+
+    // If we were able to connect to the server, reset the failure
+    // information.
+    if (couldNotCommunicate == false) {
+      LOG.info("Clearing out PFFE for server " + server.getHostname());
+      repeatedFailuresMap.remove(server);
+    } else {
+      // update time of last attempt
+      long currentTime = System.currentTimeMillis();
+      fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+      // Release the lock if we were retrying inspite of FastFail
+      if (retryDespiteFastFailMode) {
+        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+        threadRetryingInFastFailMode.get().setValue(false);
+      }
+    }
+
+    occasionallyCleanupFailureInformation();
+  }
+
+  public void updateFailureInfoForServer(HServerAddress server,
+      boolean didTry, boolean couldNotCommunicate) {
+    FailureInfo fInfo = repeatedFailuresMap.get(server);
+    boolean retryDespiteFastFailMode = false;
+    if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
+      // In Fast-fail mode, all but one thread will fast fail. Check
+      // if we are that one chosen thread.
+      retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+    }
+
+    updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode);
+  }
+
+  public void handleThrowable(Throwable t1, ServerCallable<?> callable,
+                              MutableBoolean couldNotCommunicateWithServer)
+      throws IOException {
+    Throwable t2 = translateException(t1);
+    boolean isLocalException = !(t2 instanceof RemoteException);
+    // translateException throws DoNotRetryException or any
+    // non-IOException.
+    if (isLocalException && isNetworkException(t2)) {
+      couldNotCommunicateWithServer.setValue(true);
+      handleFailureToServer(callable.getServerAddress(), t2);
+    }
+
+    updateClientContext(callable, t2);
+    if (t2 instanceof IOException) {
+      throw (IOException) t2;
+    } else {
+      throw new RuntimeException(t2);
+    }
+  }
+
+  private Callable<Long> createGetServerStartCodeCallable(
+      final HServerAddress address, final HBaseRPCOptions options) {
+    final HConnection connection = this;
+    return new Callable<Long>() {
+      @Override
+      public Long call() throws IOException {
+        return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
+            connection, address, options) {
+          @Override
+          public Long call() throws IOException {
+            return server.getStartCode();
+          }
+        });
+      }
+    };
+  }
+
+  private Callable<Long> createCurrentTimeCallable(
+      final HServerAddress address, final HBaseRPCOptions options) {
+    final HConnection connection = this;
+    return new Callable<Long>() {
+      @Override
+      public Long call() throws IOException {
+        return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
+            connection, address, options) {
+          @Override
+          public Long call() throws IOException {
+            return server.getCurrentTimeMillis();
+          }
+        });
+      }
+    };
+  }
+
+  private Callable<MapWritable> createGetLastFlushTimesCallable(
+      final HServerAddress address, final HBaseRPCOptions options) {
+    final HConnection connection = this;
+    return new Callable<MapWritable>() {
+      @Override
+      public MapWritable call() throws IOException {
+        return getRegionServerWithoutRetries(new ServerCallableForBatchOps<MapWritable>(
+            connection, address, options) {
+          @Override
+          public MapWritable call() throws IOException {
+            return server.getLastFlushTimes();
+          }
+        });
+      }
+    };
+  }
+
+  private Callable<Void> createFlushCallable(final HServerAddress address,
+      final HRegionInfo region, final long targetFlushTime,
+      final HBaseRPCOptions options) {
+    final HConnection connection = this;
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Void>(
+            connection, address, options) {
+          @Override
+          public Void call() throws IOException {
+            server.flushRegion(region.getRegionName(), targetFlushTime);
+            return null;
+          }
+        });
+      }
+    };
+  }
+
+  private Callable<MultiResponse> createMultiActionCallable(
+      final HServerAddress address, final MultiAction multi,
+      final byte[] tableName, final HBaseRPCOptions options) {
+    final HConnection connection = this;
+    // no need to track mutations here. Done at the caller.
+    return new Callable<MultiResponse>() {
+      @Override
+      public MultiResponse call() throws IOException {
+        return getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<MultiResponse>(connection, address,
+                options) {
+              @Override
+              public MultiResponse call() throws IOException,
+                  InterruptedException, ExecutionException {
+                return server.multiAction(multi);
+              }
+            }, true);
+      }
+    };
+  }
+
+  private HRegionLocation getRegionLocationForRowWithRetries(
+      byte[] tableName, byte[] rowKey, boolean reload) throws IOException {
+    boolean reloadFlag = reload;
+    List<Throwable> exceptions = new ArrayList<Throwable>();
+    HRegionLocation location = null;
+    int tries = 0;
+    for (; tries < params.getNumRetries();) {
+      try {
+        location = getRegionLocation(tableName, rowKey, reloadFlag);
+      } catch (Throwable t) {
+        exceptions.add(t);
+      }
+      if (location != null) {
+        break;
+      }
+      reloadFlag = true;
+      tries++;
+      try {
+        Thread.sleep(params.getPauseTime(tries));
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    if (location == null) {
+      throw new RetriesExhaustedException(
+          " -- nothing found, no 'location' returned," + " tableName="
+              + Bytes.toString(tableName) + ", reload=" + reload + " --",
+          HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions);
+    }
+    return location;
+  }
+
+  private <R extends Row> Map<HServerAddress, MultiAction> splitRequestsByRegionServer(
+      List<R> workingList, final byte[] tableName, boolean isGets)
+      throws IOException {
+    Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
+    for (int i = 0; i < workingList.size(); i++) {
+      Row row = workingList.get(i);
+      if (row != null) {
+        HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
+
+        byte[] regionName = loc.getRegionInfo().getRegionName();
+
+        MultiAction actions = actionsByServer.get(loc.getServerAddress());
+        if (actions == null) {
+          actions = new MultiAction();
+          actionsByServer.put(loc.getServerAddress(), actions);
+        }
+
+        if (isGets) {
+          actions.addGet(regionName, (Get) row, i);
+        } else {
+          trackMutationsToTable(tableName, loc);
+          actions.mutate(regionName, (Mutation) row);
+        }
+      }
+    }
+    return actionsByServer;
+  }
+
+  private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
+      Map<HServerAddress, MultiAction> actionsByServer,
+      final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
+
+    Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress, Future<MultiResponse>>(
+        actionsByServer.size());
+
+    boolean singleServer = (actionsByServer.size() == 1);
+    for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
+      Callable<MultiResponse> callable = createMultiActionCallable(
+          e.getKey(), e.getValue(), tableName, options);
+      Future<MultiResponse> task;
+      if (singleServer) {
+        task = new FutureTask<MultiResponse>(callable);
+        ((FutureTask<MultiResponse>) task).run();
+      } else {
+        task = pool.submit(callable);
+      }
+      futures.put(e.getKey(), task);
+    }
+    return futures;
+  }
+
+  /*
+   * Collects responses from each of the RegionServers. If there are failures,
+   * a list of failed operations is returned.
+   */
+  private List<Mutation> collectResponsesForMutateFromAllRS(byte[] tableName,
+      Map<HServerAddress, MultiAction> actionsByServer,
+      Map<HServerAddress, Future<MultiResponse>> futures,
+      Map<String, HRegionFailureInfo> failureInfo)
+      throws InterruptedException, IOException {
+
+    List<Mutation> newWorkingList = null;
+    for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+        .entrySet()) {
+      HServerAddress address = responsePerServer.getKey();
+      MultiAction request = actionsByServer.get(address);
+
+      Future<MultiResponse> future = responsePerServer.getValue();
+      MultiResponse resp = null;
+
+      try {
+        resp = future.get();
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof DoNotRetryIOException)
+          throw (DoNotRetryIOException) e.getCause();
+      }
+
+      // If we got a response. Let us go through the responses from each
+      // region and
+      // process the Puts and Deletes.
+      // If the response is null, we will add it to newWorkingList here.
+      if (request.getDeletes() != null) {
+        newWorkingList = processMutationResponseFromOneRegionServer(
+            tableName, address, resp, request.getDeletes(), newWorkingList,
+            true, failureInfo);
+      }
+      if (request.getPuts() != null) {
+        newWorkingList = processMutationResponseFromOneRegionServer(
+            tableName, address, resp, request.getPuts(), newWorkingList,
+            false, failureInfo);
+      }
+    }
+    return newWorkingList;
+  }
+
+  /*
+   * Collects responses from each of the RegionServers and populates the
+   * result array. If there are failures, a list of failed operations is
+   * returned.
+   */
+  private List<Get> collectResponsesForGetFromAllRS(byte[] tableName,
+      Map<HServerAddress, MultiAction> actionsByServer,
+      Map<HServerAddress, Future<MultiResponse>> futures,
+      List<Get> orig_list, Result[] results,
+      Map<String, HRegionFailureInfo> failureInfo) throws IOException,
+      InterruptedException {
+
+    List<Get> newWorkingList = null;
+    for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+        .entrySet()) {
+      HServerAddress address = responsePerServer.getKey();
+      MultiAction request = actionsByServer.get(address);
+
+      Future<MultiResponse> future = responsePerServer.getValue();
+      MultiResponse resp = null;
+
+      try {
+        resp = future.get();
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof DoNotRetryIOException) {
+          throw (DoNotRetryIOException) e.getCause();
+        }
+        e.printStackTrace();
+      }
+
+      if (resp == null) {
+        // Entire server failed
+        LOG.debug("Failed all for server: " + address
+            + ", removing from cache");
+      }
+
+      newWorkingList = processGetResponseFromOneRegionServer(tableName,
+          address, request, resp, orig_list, newWorkingList, results,
+          failureInfo);
+    }
+    return newWorkingList;
+  }
+
+  private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
+      byte[] tableName, HServerAddress address, MultiResponse resp,
+      Map<byte[], List<R>> map, List<Mutation> newWorkingList,
+      boolean isDelete, Map<String, HRegionFailureInfo> failureInfo)
+      throws IOException {
+    // If we got a response. Let us go through the responses from each region
+    // and
+    // process the Puts and Deletes.
+    for (Map.Entry<byte[], List<R>> e : map.entrySet()) {
+      byte[] regionName = e.getKey();
+      List<R> regionOps = map.get(regionName);
+
+      long result = 0;
+      try {
+        if (isDelete)
+          result = resp.getDeleteResult(regionName);
+        else
+          result = resp.getPutResult(regionName);
+
+        if (result != -1) {
+          if (newWorkingList == null)
+            newWorkingList = new ArrayList<Mutation>();
+
+          newWorkingList.addAll(regionOps.subList((int) result,
+              regionOps.size()));
+        }
+      } catch (Exception ex) {
+        String serverName = address.getHostname();
+        String regName = Bytes.toStringBinary(regionName);
+        // If response is null, we will catch a NPE here.
+        translateException(ex);
+
+        if (!failureInfo.containsKey(regName)) {
+          failureInfo.put(regName, new HRegionFailureInfo(regName));
+        }
+        failureInfo.get(regName).addException(ex);
+        failureInfo.get(regName).setServerName(serverName);
+
+        if (newWorkingList == null)
+          newWorkingList = new ArrayList<Mutation>();
+
+        newWorkingList.addAll(regionOps);
+        // enough to remove from cache one of the rows from the region
+        deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
+      }
+    }
+
+    return newWorkingList;
+  }
+
+  private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
+      HServerAddress address, MultiAction request, MultiResponse resp,
+      List<Get> orig_list, List<Get> newWorkingList, Result[] results,
+      Map<String, HRegionFailureInfo> failureInfo) throws IOException {
+
+    for (Map.Entry<byte[], List<Get>> e : request.getGets().entrySet()) {
+      byte[] regionName = e.getKey();
+      List<Get> regionGets = request.getGets().get(regionName);
+      List<Integer> origIndices = request.getOriginalIndex().get(regionName);
+
+      Result[] getResult;
+      try {
+        getResult = resp.getGetResult(regionName);
+
+        // fill up the result[] array accordingly
+        assert (origIndices.size() == getResult.length);
+        for (int i = 0; i < getResult.length; i++) {
+          results[origIndices.get(i)] = getResult[i];
+        }
+      } catch (Exception ex) {
+        // If response is null, we will catch a NPE here.
+        translateException(ex);
+
+        if (newWorkingList == null)
+          newWorkingList = new ArrayList<Get>(orig_list.size());
+
+        String serverName = address.getHostname();
+        String regName = Bytes.toStringBinary(regionName);
+
+        if (!failureInfo.containsKey(regName)) {
+          failureInfo.put(regName, new HRegionFailureInfo(regName));
+        }
+        failureInfo.get(regName).addException(ex);
+        failureInfo.get(regName).setServerName(serverName);
+
+        // Add the element to the correct position
+        for (int i = 0; i < regionGets.size(); i++) {
+          newWorkingList.add(origIndices.get(i), regionGets.get(i));
+        }
+
+        // enough to clear this once for a region
+        deleteCachedLocation(tableName, regionGets.get(0).getRow(), address);
+      }
+    }
+
+    return newWorkingList;
+  }
+
+  @Override
+  public void processBatchedMutations(List<Mutation> orig_list,
+      final byte[] tableName, ExecutorService pool, List<Mutation> failures,
+      HBaseRPCOptions options) throws IOException, InterruptedException {
+
+    // Keep track of the most recent servers for any given item for better
+    // exception reporting. We keep HRegionLocation to save on parsing.
+    // Later below when we use lastServers, we'll pull what we need from
+    // lastServers.
+    // Sort the puts based on the row key in order to optimize the row lock
+    // acquiring
+    // in the server side.
+
+    Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
+    List<Mutation> workingList = orig_list;
+    Collections.sort(workingList);
+
+    for (int tries = 0; workingList != null && !workingList.isEmpty()
+        && tries < params.getNumRetries(); ++tries) {
+
+      if (tries >= 1) {
+        long sleepTime = params.getPauseTime(tries);
+        LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!");
+        Thread.sleep(sleepTime);
+      }
+
+      // step 1: break up into regionserver-sized chunks and build the data
+      // structs
+      Map<HServerAddress, MultiAction> actionsByServer = splitRequestsByRegionServer(
+          workingList, tableName, false);
+
+      // step 2: make the requests
+      Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+          actionsByServer, tableName, pool, options);
+
+      // step 3: collect the failures and successes and prepare for retry
+      workingList = collectResponsesForMutateFromAllRS(tableName,
+          actionsByServer, futures, failureInfo);
+    }
+
+    if (workingList != null && !workingList.isEmpty()) {
+      if (failures != null)
+        failures.addAll(workingList);
+      throw new RetriesExhaustedException(failureInfo, workingList.size()
+          + "mutate operations remaining after " + params.getNumRetries()
+          + " retries");
+    }
+  }
+
+  @Override
+  public void processBatchedGets(List<Get> orig_list, final byte[] tableName,
+      ExecutorService pool, Result[] results, HBaseRPCOptions options)
+      throws IOException, InterruptedException {
+
+    Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
+    // if results is not NULL
+    // results must be the same size as list
+    if (results != null && (results.length != orig_list.size())) {
+      throw new IllegalArgumentException(
+          "argument results must be the same size as argument list");
+    }
+
+    // Keep track of the most recent servers for any given item for better
+    // exception reporting. We keep HRegionLocation to save on parsing.
+    // Later below when we use lastServers, we'll pull what we need from
+    // lastServers.
+    List<Get> workingList = orig_list;
+
+    for (int tries = 0; workingList != null && !workingList.isEmpty()
+        && tries < params.getNumRetries(); ++tries) {
+
+      if (tries >= 1) {

[... 1184 lines stripped ...]