You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:01 UTC

[15/49] git commit: HBASE-10356 Failover RPC's for multi-get

HBASE-10356 Failover RPC's for multi-get

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569559 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25b6103d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25b6103d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25b6103d

Branch: refs/heads/master
Commit: 25b6103dadba16e85db0a8c5f2fc44ecf9fc3f2a
Parents: d6f603a
Author: sershe <se...@unknown>
Authored: Tue Feb 18 23:37:17 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:37 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HRegionInfo.java    |   2 +-
 .../apache/hadoop/hbase/RegionLocations.java    |   5 +
 .../org/apache/hadoop/hbase/client/Action.java  |  23 +-
 .../hadoop/hbase/client/AsyncProcess.java       | 580 ++++++++++++++++---
 .../hadoop/hbase/client/ClusterConnection.java  |   6 +
 .../hadoop/hbase/client/ConnectionAdapter.java  |   6 +
 .../hadoop/hbase/client/ConnectionManager.java  |  13 +-
 .../apache/hadoop/hbase/client/MultiAction.java |  11 -
 .../hbase/client/MultiServerCallable.java       |   6 +
 .../hadoop/hbase/client/RegionReplicaUtil.java  |   5 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 332 +++++++++--
 .../hbase/client/CoprocessorHConnection.java    |  13 +-
 .../hbase/client/HConnectionTestingUtility.java |   3 +
 13 files changed, 852 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 0f846b5..59a3248 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -143,7 +143,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
   public static final byte REPLICA_ID_DELIMITER = (byte)'_';
 
   private static final int MAX_REPLICA_ID = 0xFFFF;
-  private static final int DEFAULT_REPLICA_ID = 0;
+  static final int DEFAULT_REPLICA_ID = 0;
   /**
    * Does region name contain its encoded name?
    * @param regionName region name

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index cdf1180..b5db549 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
 import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -261,6 +262,10 @@ public class RegionLocations {
     return locations;
   }
 
+  public HRegionLocation getDefaultRegionLocation() {
+    return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+  }
+
   /**
    * Returns the first not-null region location in the list
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index c3e2104..5147c25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 
 /**
  * A Get, Put, Increment, Append, or Delete associated with it's region.  Used internally by  
@@ -27,18 +28,34 @@ import org.apache.hadoop.hbase.HConstants;
  * the index from the original request. 
  */
 @InterfaceAudience.Private
+//TODO: R is never used
 public class Action<R> implements Comparable<R> {
   // TODO: This class should not be visible outside of the client package.
   private Row action;
   private int originalIndex;
   private long nonce = HConstants.NO_NONCE;
+  private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
 
   public Action(Row action, int originalIndex) {
     super();
     this.action = action;
-    this.originalIndex = originalIndex;    
+    this.originalIndex = originalIndex;
   }
 
+  /**
+   * Creates an action for a particular replica from original action.
+   * @param action Original action.
+   * @param replicaId Replica id for the new action.
+   */
+  public Action(Action<R> action, int replicaId) {
+    super();
+    this.action = action.action;
+    this.nonce = action.nonce;
+    this.originalIndex = action.originalIndex;
+    this.replicaId = replicaId;
+  }
+
+
   public void setNonce(long nonce) {
     this.nonce = nonce;
   }
@@ -55,6 +72,10 @@ public class Action<R> implements Comparable<R> {
     return originalIndex;
   }
 
+  public int getReplicaId() {
+    return replicaId;
+  }
+
   @SuppressWarnings("rawtypes")
   @Override
   public int compareTo(Object o) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 714daeb..9419932 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.htrace.Trace;
-
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -89,9 +89,11 @@ import com.google.common.annotations.VisibleForTesting;
  * </p>
  */
 class AsyncProcess {
-  private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+  protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
   protected static final AtomicLong COUNTER = new AtomicLong();
 
+  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
+
   /**
    * The context used to wait for results from one submit call.
    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@@ -102,7 +104,7 @@ class AsyncProcess {
     public boolean hasError();
     public RetriesExhaustedWithDetailsException getErrors();
     public List<? extends Row> getFailedOperations();
-    public Object[] getResults();
+    public Object[] getResults() throws InterruptedIOException;
     /** Wait until all tasks are executed, successfully or not. */
     public void waitUntilDone() throws InterruptedIOException;
   }
@@ -122,6 +124,27 @@ class AsyncProcess {
     public void waitUntilDone() throws InterruptedIOException {}
   };
 
+  /** Sync point for calls to multiple replicas for the same user request (Get).
+   * Created and put in the results array (we assume replica calls require results) when
+   * the replica calls are launched. See results for details of this process.
+   * POJO, all fields are public. To modify them, the object itself is locked. */
+  private static class ReplicaResultState {
+    public ReplicaResultState(int callCount) {
+      this.callCount = callCount;
+    }
+
+    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
+    int callCount;
+    /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
+     * is not last, adds error to list. If all calls fail the last one sets this to list. */
+    Object result = null;
+    /** Errors for which it is not decided whether we will report them to user. If one of the
+     * calls succeeds, we will discard the errors that may have happened in the other calls. */
+    BatchErrors replicaErrors = null;
+  }
+
+
+  // TODO: many of the fields should be made private
   protected final long id;
 
   protected final ClusterConnection hConnection;
@@ -160,6 +183,7 @@ class AsyncProcess {
   protected int numTries;
   protected int serverTrackerTimeout;
   protected int timeout;
+  protected long primaryCallTimeout;
   // End configuration settings.
 
   protected static class BatchErrors {
@@ -192,6 +216,12 @@ class AsyncProcess {
       actions.clear();
       addresses.clear();
     }
+
+    public synchronized void merge(BatchErrors other) {
+      throwables.addAll(other.throwables);
+      actions.addAll(other.actions);
+      addresses.addAll(other.addresses);
+    }
   }
 
   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
@@ -212,6 +242,7 @@ class AsyncProcess {
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
 
     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@@ -270,7 +301,8 @@ class AsyncProcess {
 
   /**
    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
-   * list.
+   * list. Does not send requests to replicas (not currently used for anything other
+   * than streaming puts anyway).
    *
    * @param pool ExecutorService to use.
    * @param tableName The table for which this request is needed.
@@ -311,7 +343,7 @@ class AsyncProcess {
         Row r = it.next();
         HRegionLocation loc;
         try {
-          loc = findDestLocation(tableName, r);
+          loc = findDestLocation(tableName, r, true).getDefaultRegionLocation();
         } catch (IOException ex) {
           locationErrors = new ArrayList<Exception>();
           locationErrorRows = new ArrayList<Integer>();
@@ -329,7 +361,9 @@ class AsyncProcess {
           Action<Row> action = new Action<Row>(r, ++posInList);
           setNonce(ng, r, action);
           retainedActions.add(action);
-          addAction(loc, action, actionsByServer, nonceGroup);
+          // TODO: replica-get is not supported on this path
+          byte[] regionName = loc.getRegionInfo().getRegionName();
+          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
           it.remove();
         }
       }
@@ -347,7 +381,7 @@ class AsyncProcess {
         ars.manageError(originalIndex, row, false, locationErrors.get(i), null);
       }
     }
-    ars.sendMultiAction(actionsByServer, 1);
+    ars.sendMultiAction(actionsByServer, 1, null);
     return ars;
   }
 
@@ -359,13 +393,12 @@ class AsyncProcess {
    * @param actionsByServer the multiaction per server
    * @param nonceGroup Nonce group.
    */
-  private void addAction(HRegionLocation loc, Action<Row> action,
+  private void addAction(ServerName server, byte[] regionName, Action<Row> action,
       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
-    final byte[] regionName = loc.getRegionInfo().getRegionName();
-    MultiAction<Row> multiAction = actionsByServer.get(loc.getServerName());
+    MultiAction<Row> multiAction = actionsByServer.get(server);
     if (multiAction == null) {
       multiAction = new MultiAction<Row>();
-      actionsByServer.put(loc.getServerName(), multiAction);
+      actionsByServer.put(server, multiAction);
     }
     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
       multiAction.setNonceGroup(nonceGroup);
@@ -380,10 +413,12 @@ class AsyncProcess {
    * @param row the row
    * @return the destination.
    */
-  private HRegionLocation findDestLocation(TableName tableName, Row row) throws IOException {
+  private RegionLocations findDestLocation(
+      TableName tableName, Row row, boolean checkPrimary) throws IOException {
     if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
-    HRegionLocation loc = hConnection.locateRegion(tableName, row.getRow());
-    if (loc == null) {
+    RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
+    if (loc == null
+        || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) {
       throw new IOException("#" + id + ", no location found, aborting submit for" +
           " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
     }
@@ -516,6 +551,144 @@ class AsyncProcess {
    * scheduling children. This is why lots of code doesn't require any synchronization.
    */
   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
+
+    /**
+     * Runnable (that can be submitted to thread pool) that waits for when it's time
+     * to issue replica calls, finds region replicas, groups the requests by replica and
+     * issues the calls (on separate threads, via sendMultiAction).
+     * This is done on a separate thread because we don't want to wait on user thread for
+     * our asynchronous call, and usually we have to wait before making replica calls.
+     */
+    private final class ReplicaCallIssuingRunnable implements Runnable {
+      private final long startTime;
+      private final List<Action<Row>> initialActions;
+
+      public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
+        this.initialActions = initialActions;
+        this.startTime = startTime;
+      }
+
+      @Override
+      public void run() {
+        boolean done = false;
+        if (primaryCallTimeout > 0) {
+          try {
+            done = waitUntilDone(startTime + primaryCallTimeout);
+          } catch (InterruptedException ex) {
+            LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
+            return;
+          }
+        }
+        if (done) return; // Done within primary timeout
+        Map<ServerName, MultiAction<Row>> actionsByServer =
+            new HashMap<ServerName, MultiAction<Row>>();
+        if (replicaGetIndices == null) {
+          for (int i = 0; i < results.length; ++i) {
+            addReplicaActions(i, actionsByServer);
+          }
+        } else {
+          for (int i = 0; i < replicaGetIndices.length; ++i) {
+            addReplicaActions(replicaGetIndices[i], actionsByServer);
+          }
+        }
+        if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
+        sendMultiAction(actionsByServer, 1, null);
+      }
+
+      /**
+       * Add replica actions to action map by server.
+       * @param index Index of the original action.
+       * @param actionsByServer The map by server to add it to.
+       */
+      private void addReplicaActions(
+          int index, Map<ServerName, MultiAction<Row>> actionsByServer) {
+        if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
+        Action<Row> action = initialActions.get(index);
+        RegionLocations loc = null;
+        try {
+          // For perf, we assume that this location coming from cache, since we just got location
+          // from meta for the primary call. If it turns out to not be the case, we'd need local
+          // cache since we want to keep as little time as possible before replica call.
+          loc = findDestLocation(tableName, action.getAction(), false);
+        } catch (IOException ex) {
+          manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
+          LOG.error("Cannot get location - no replica calls for some actions", ex);
+          return;
+        }
+        HRegionLocation[] locs = loc.getRegionLocations();
+        int replicaCount = 0;
+        for (int i = 1; i < locs.length; ++i) {
+          replicaCount += (locs[i] != null) ? 1 : 0;
+        }
+        if (replicaCount == 0) {
+          LOG.warn("No replicas found for " + action.getAction());
+          return;
+        }
+        synchronized (replicaResultLock) {
+          // Don't run replica calls if the original has finished. We could do it e.g. if
+          // original has already failed before first replica call (unlikely given retries),
+          // but that would require additional synchronization w.r.t. returning to caller.
+          if (results[index] != null) return;
+          // We set the number of calls here. After that any path must call setResult/setError.
+          results[index] = new ReplicaResultState(replicaCount + 1);
+        }
+        for (int i = 1; i < locs.length; ++i) {
+          if (locs[i] == null) continue;
+          addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
+              new Action<Row>(action, i), actionsByServer, nonceGroup);
+        }
+      }
+    }
+
+    /**
+     * Runnable (that can be submitted to thread pool) that submits MultiAction to a
+     * single server. The server call is synchronous, therefore we do it on a thread pool.
+     */
+    private final class SingleServerRequestRunnable implements Runnable {
+      private final MultiAction<Row> multiAction;
+      private final int numAttempt;
+      private final ServerName server;
+
+      private SingleServerRequestRunnable(
+          MultiAction<Row> multiAction, int numAttempt, ServerName server) {
+        this.multiAction = multiAction;
+        this.numAttempt = numAttempt;
+        this.server = server;
+      }
+
+      @Override
+      public void run() {
+        MultiResponse res;
+        try {
+          MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
+          try {
+            res = createCaller(callable).callWithoutRetries(callable, timeout);
+          } catch (IOException e) {
+            // The service itself failed . It may be an error coming from the communication
+            //   layer, but, as well, a functional error raised by the server.
+            receiveGlobalFailure(multiAction, server, numAttempt, e);
+            return;
+          } catch (Throwable t) {
+            // This should not happen. Let's log & retry anyway.
+            LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
+                " Retrying. Server is " + server + ", tableName=" + tableName, t);
+            receiveGlobalFailure(multiAction, server, numAttempt, t);
+            return;
+          }
+
+          // Normal case: we received an answer from the server, and it's not an exception.
+          receiveMultiAction(multiAction, server, res, numAttempt);
+        } catch (Throwable t) {
+              // Something really bad happened. We are on the send thread that will now die.
+              LOG.error("Internal AsyncProcess #" + id + " error for "
+                  + tableName + " processing for " + server, t);
+              throw new RuntimeException(t);
+        } finally {
+          decTaskCounters(multiAction.getRegions(), server);
+        }
+      }
+    }
+
     private final Batch.Callback<CResult> callback;
     private final BatchErrors errors;
     private final ConnectionManager.ServerErrorTracker errorsByServer;
@@ -524,7 +697,21 @@ class AsyncProcess {
 
     private final TableName tableName;
     private final AtomicLong actionsInProgress = new AtomicLong(-1);
+    /** The lock controls access to results. It is only held when populating results where
+     * there might be several callers (eventual consistency gets). For other requests,
+     * there's one unique call going on per result index. */
+    private final Object replicaResultLock = new Object();
+    /** Result array.  Null if results are not needed. Otherwise, each index corresponds to
+     * the action index in initial actions submitted. For most request types, has null-s for
+     * requests that are not done, and result/exception for those that are done.
+     * For eventual-consistency gets, initially the same applies; at some point, replica calls
+     * might be started, and ReplicaResultState is put at the corresponding indices. The
+     * returning calls check the type to detect when this is the case. After all calls are done,
+     * ReplicaResultState-s are replaced with results for the user. */
     private final Object[] results;
+    /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
+    private final int[] replicaGetIndices;
+    private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
@@ -545,6 +732,51 @@ class AsyncProcess {
       } else {
         this.results = needResults ? new Object[actions.size()] : null;
       }
+      List<Integer> replicaGetIndices = null;
+      boolean hasAnyReplicaGets = false;
+      if (needResults) {
+        // Check to see if any requests might require replica calls.
+        // We expect that many requests will consist of all or no multi-replica gets; in such
+        // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
+        // store the list of action indexes for which replica gets are possible, and set
+        // hasAnyReplicaGets to true.
+        boolean hasAnyNonReplicaReqs = false;
+        int posInList = 0;
+        for (Action<Row> action : actions) {
+          boolean isReplicaGet = isReplicaGet(action.getAction());
+          if (isReplicaGet) {
+            hasAnyReplicaGets = true;
+            if (hasAnyNonReplicaReqs) { // Mixed case
+              if (replicaGetIndices == null) {
+                replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
+              }
+              replicaGetIndices.add(posInList);
+            }
+          } else if (!hasAnyNonReplicaReqs) {
+            // The first non-multi-replica request in the action list.
+            hasAnyNonReplicaReqs = true;
+            if (posInList > 0) {
+              // Add all the previous requests to the index lists. We know they are all
+              // replica-gets because this is the first non-multi-replica request in the list.
+              replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
+              for (int i = 0; i < posInList; ++i) {
+                replicaGetIndices.add(i);
+              }
+            }
+          }
+          ++posInList;
+        }
+      }
+      this.hasAnyReplicaGets = hasAnyReplicaGets;
+      if (replicaGetIndices != null) {
+        this.replicaGetIndices = new int[replicaGetIndices.size()];
+        int i = 0;
+        for (Integer el : replicaGetIndices) {
+          this.replicaGetIndices[i++] = el;
+        }
+      } else {
+        this.replicaGetIndices = null;
+      }
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
     }
@@ -560,21 +792,40 @@ class AsyncProcess {
       final Map<ServerName, MultiAction<Row>> actionsByServer =
           new HashMap<ServerName, MultiAction<Row>>();
 
-      HRegionLocation loc;
+      boolean isReplica = false;
       for (Action<Row> action : currentActions) {
+        RegionLocations locs = null;
         try {
-          loc = findDestLocation(tableName, action.getAction());
+          locs = findDestLocation(tableName, action.getAction(), false);
         } catch (IOException ex) {
           // There are multiple retries in locateRegion already. No need to add new.
           // We can't continue with this row, hence it's the last retry.
           manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
           continue;
         }
-        addAction(loc, action, actionsByServer, nonceGroup);
-      }
 
+        boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+        if (isReplica && !isReplicaAction) {
+          // This is the property of the current implementation, not a requirement.
+          throw new AssertionError("Replica and non-replica actions in the same retry");
+        }
+        isReplica = isReplicaAction;
+        HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
+        if (loc == null || loc.getServerName() == null) {
+          // On retry, we couldn't find location for some replica we saw before.
+          String str = "Cannot find location for replica " + action.getReplicaId();
+          LOG.error(str);
+          manageError(action.getOriginalIndex(), action.getAction(),
+              false, new IOException(str), null);
+          continue;
+        }
+        byte[] regionName = loc.getRegionInfo().getRegionName();
+        addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
+      }
+      // If this is a first attempt to group and send, no replicas, we need replica thread.
       if (!actionsByServer.isEmpty()) {
-        sendMultiAction(actionsByServer, numAttempt);
+        boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
+        sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null);
       }
     }
 
@@ -584,51 +835,22 @@ class AsyncProcess {
      *
      * @param actionsByServer the actions structured by regions
      * @param numAttempt      the attempt number.
+     * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
      */
-    private void sendMultiAction(
-        Map<ServerName, MultiAction<Row>> actionsByServer, final int numAttempt) {
+    private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
+        int numAttempt, List<Action<Row>> actionsForReplicaThread) {
       // Run the last item on the same thread if we are already on a send thread.
       // We hope most of the time it will be the only item, so we can cut down on threads.
-      int reuseThreadCountdown = (numAttempt > 1) ? actionsByServer.size() : Integer.MAX_VALUE;
+      int actionsRemaining = actionsByServer.size();
+      // This iteration is by server (the HRegionLocation comparator is by server portion only).
       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
         final ServerName server = e.getKey();
         final MultiAction<Row> multiAction = e.getValue();
         incTaskCounters(multiAction.getRegions(), server);
-        Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
-          @Override
-          public void run() {
-            MultiResponse res;
-            try {
-              MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
-              try {
-                res = createCaller(callable).callWithoutRetries(callable, timeout);
-              } catch (IOException e) {
-                // The service itself failed . It may be an error coming from the communication
-                //   layer, but, as well, a functional error raised by the server.
-                receiveGlobalFailure(multiAction, server, numAttempt, e);
-                return;
-              } catch (Throwable t) {
-                // This should not happen. Let's log & retry anyway.
-                LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
-                    " Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t);
-                receiveGlobalFailure(multiAction, server, numAttempt, t);
-                return;
-              }
-
-              // Normal case: we received an answer from the server, and it's not an exception.
-              receiveMultiAction(multiAction, server, res, numAttempt);
-            } catch (Throwable t) {
-              // Something really bad happened. We are on the send thread that will now die.
-              LOG.error("Internal AsyncProcess #" + id + " error for "
-                  + tableName + " processing for " + server, t);
-              throw new RuntimeException(t);
-            } finally {
-              decTaskCounters(multiAction.getRegions(), server);
-            }
-          }
-        });
-        --reuseThreadCountdown;
-        if (reuseThreadCountdown == 0) {
+        Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
+            new SingleServerRequestRunnable(multiAction, numAttempt, server));
+        --actionsRemaining;
+        if ((numAttempt > 1) && actionsRemaining == 0) {
           runnable.run();
         } else {
           try {
@@ -645,6 +867,30 @@ class AsyncProcess {
           }
         }
       }
+      if (actionsForReplicaThread != null) {
+        startWaitingForReplicaCalls(actionsForReplicaThread);
+      }
+    }
+
+    /**
+     * Starts waiting to issue replica calls on a different thread; or issues them immediately.
+     */
+    private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
+      long startTime = EnvironmentEdgeManager.currentTimeMillis();
+      ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
+          actionsForReplicaThread, startTime);
+      if (primaryCallTimeout == 0) {
+        // Start replica calls immediately.
+        replicaRunnable.run();
+      } else {
+        // Start the thread that may kick off replica gets.
+        // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
+        try {
+          pool.submit(replicaRunnable);
+        } catch (RejectedExecutionException ree) {
+          LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
+        }
+      }
     }
 
     /**
@@ -665,11 +911,11 @@ class AsyncProcess {
 
       if (!canRetry) {
         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
-        errors.add(throwable, row, server);
-        if (results != null) {
-          setResult(originalIndex, row, throwable);
-        }
-        decActionCounter();
+        setError(originalIndex, row, throwable, server);
+      } else {
+        // See if we are dealing with a replica action that was completed from other server.
+        // Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
+        canRetry = !isActionComplete(originalIndex, row);
       }
 
       return canRetry;
@@ -685,15 +931,17 @@ class AsyncProcess {
      */
     private void receiveGlobalFailure(
         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
-      // Do not use the exception for updating cache because it might be coming from
-      // any of the regions in the MultiAction.
-      byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
-      hConnection.updateCachedLocations(tableName, null, row, null, server);
       errorsByServer.reportServerError(server);
       boolean canRetry = errorsByServer.canRetryMore(numAttempt);
 
       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
+        byte[] regionName = e.getKey();
+        byte[] row = e.getValue().iterator().next().getAction().getRow();
+        // Do not use the exception for updating cache because it might be coming from
+        // any of the regions in the MultiAction.
+        // TODO: depending on type of exception we might not want to update cache at all?
+        hConnection.updateCachedLocations(tableName, regionName, row, null, server);
         for (Action<Row> action : e.getValue()) {
           if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
             toReplay.add(action);
@@ -791,14 +1039,16 @@ class AsyncProcess {
           // Failure: retry if it's make sense else update the errors lists
           if (result == null || result instanceof Throwable) {
             Row row = sentAction.getAction();
-            if (!regionFailureRegistered) { // We're doing this once per location.
+            // Register corresponding failures once per server/once per region.
+            if (!regionFailureRegistered) {
               regionFailureRegistered = true;
-              // The location here is used as a server name.
-              hConnection.updateCachedLocations(tableName, regionName, row.getRow(), result, server);
-              if (failureCount == 0) {
-                errorsByServer.reportServerError(server);
-                canRetry = errorsByServer.canRetryMore(numAttempt);
-              }
+              hConnection.updateCachedLocations(
+                  tableName, regionName, row.getRow(), result, server);
+            }
+            if (failureCount == 0) {
+              errorsByServer.reportServerError(server);
+              // We determine canRetry only once for all calls, after reporting server failure.
+              canRetry = errorsByServer.canRetryMore(numAttempt);
             }
             ++failureCount;
             if (manageError(
@@ -809,16 +1059,14 @@ class AsyncProcess {
             if (callback != null) {
               try {
                 //noinspection unchecked
+                // TODO: would callback expect a replica region name if it gets one?
                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
               } catch (Throwable t) {
                 LOG.error("User callback threw an exception for "
                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
               }
             }
-            if (results != null) {
-              setResult(sentAction.getOriginalIndex(), sentAction.getAction(), result);
-            }
-            decActionCounter();
+            setResult(sentAction, result);
           }
         }
       }
@@ -881,38 +1129,185 @@ class AsyncProcess {
       return sb.toString();
     }
 
-    private void setResult(int index, Row row, Object result) {
-      if (result == null) throw new RuntimeException("Result cannot be set to null");
-      if (results[index] != null) throw new RuntimeException("Result was already set");
-      results[index] = result;
+    /**
+     * Sets the non-error result from a particular action.
+     * @param action Action (request) that the server responded to.
+     * @param result The result.
+     */
+    private void setResult(Action<Row> action, Object result) {
+      ReplicaResultState state = null;
+      boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+      if (results == null || ((state = trySetResultSimple(
+          action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
+        decActionCounter();
+        return; // Simple case, no replica requests.
+      }
+      synchronized (state) {
+        if (state.callCount == 0) return; // someone already set the result
+        state.result = result;
+        state.callCount = 0;
+        state.replicaErrors = null; // no longer matters
+      }
+      decActionCounter();
+    }
+
+    /**
+     * Sets the error from a particular action.
+     * @param index Original action index.
+     * @param row Original request.
+     * @param throwable The resulting error.
+     * @param server The source server.
+     */
+    private void setError(int index, Row row, Throwable throwable, ServerName server) {
+      ReplicaResultState state = null;
+      if (results == null
+          || ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
+        errors.add(throwable, row, server);
+        decActionCounter();
+        return; // Simple case, no replica requests.
+      }
+      BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
+      boolean isActionDone = false;
+      synchronized (state) {
+        switch (state.callCount) {
+          case 0: return; // someone already set the result
+          case 1: { // All calls failed, we are the last error.
+            state.result = throwable;
+            target = errors;
+            isActionDone = true;
+            break;
+          }
+          default: {
+            assert state.callCount > 1;
+            if (state.replicaErrors == null) {
+              state.replicaErrors = new BatchErrors();
+            }
+            target = state.replicaErrors;
+            break;
+          }
+        }
+        --state.callCount;
+      }
+      target.add(throwable, row, server);
+      if (!isActionDone) return;
+      if (state.replicaErrors != null) { // last call, no need to lock
+        errors.merge(state.replicaErrors);
+        state.replicaErrors = null;
+      }
+      decActionCounter();
+    }
+
+    /**
+     * Checks if the action is complete; used on error to prevent needless retries.
+     * Does not synchronize, assuming element index/field accesses are atomic.
+     * This is an opportunistic optimization check, doesn't have to be strict.
+     * @param index Original action index.
+     * @param row Original request.
+     */
+    private boolean isActionComplete(int index, Row row) {
+      if (!isReplicaGet(row)) return false;
+      Object resObj = results[index];
+      return (resObj != null) && (!(resObj instanceof ReplicaResultState)
+          || ((ReplicaResultState)resObj).callCount == 0);
+    }
+
+    /**
+     * Tries to set the result or error for a particular action as if there were no replica calls.
+     * @return null if successful; replica state if there were in fact replica calls.
+     */
+    private ReplicaResultState trySetResultSimple(
+        int index, Row row, Object result, boolean isFromReplica) {
+      Object resObj = null;
+      if (!isReplicaGet(row)) {
+        if (isFromReplica) {
+          throw new AssertionError("Unexpected stale result for " + row);
+        }
+        results[index] = result;
+      } else {
+        synchronized (replicaResultLock) {
+          if ((resObj = results[index]) == null) {
+            if (isFromReplica) {
+              throw new AssertionError("Unexpected stale result for " + row);
+            }
+            results[index] = result;
+          }
+        }
+      }
+      return (resObj == null || !(resObj instanceof ReplicaResultState))
+          ? null : (ReplicaResultState)resObj;
     }
 
     private void decActionCounter() {
-      actionsInProgress.decrementAndGet();
+      if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
+        // Convert replica sync structures to results.
+        int staleCount = 0;
+        if (replicaGetIndices == null) {
+          for (int i = 0; i < results.length; ++i) {
+            staleCount += convertReplicaResult(i) ? 1 : 0;
+          }
+        } else {
+          for (int i = 0; i < replicaGetIndices.length; ++i) {
+            staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
+          }
+        }
+        if (!actionsInProgress.compareAndSet(1, 0)) {
+          throw new AssertionError("Cannot set actions in progress to 0");
+        }
+        if (staleCount > 0) {
+          LOG.trace("Returning " + staleCount + " stale results");
+        }
+      } else {
+        actionsInProgress.decrementAndGet();
+      }
       synchronized (actionsInProgress) {
         actionsInProgress.notifyAll();
       }
     }
 
+    private boolean convertReplicaResult(int index) {
+      if (!(results[index] instanceof ReplicaResultState)) return false;
+      ReplicaResultState state = (ReplicaResultState)results[index];
+      // We know that noone will touch state with 0 callCount, no need to lock
+      if (state.callCount != 0) {
+        throw new AssertionError("Actions are done but callcount is " + state.callCount);
+      }
+      // TODO: we expect the Result coming from server to already have "isStale" specified.
+      Object res = results[index] = state.result;
+      return (res instanceof Result) && ((Result)res).isStale();
+    }
+
     @Override
     public void waitUntilDone() throws InterruptedIOException {
-      long lastLog = EnvironmentEdgeManager.currentTimeMillis();
-      long currentInProgress;
       try {
-        while (0 != (currentInProgress = actionsInProgress.get())) {
-          long now = EnvironmentEdgeManager.currentTimeMillis();
+        waitUntilDone(Long.MAX_VALUE);
+      } catch (InterruptedException iex) {
+        throw new InterruptedIOException(iex.getMessage());
+      }
+    }
+
+    private boolean waitUntilDone(long cutoff) throws InterruptedException {
+      boolean hasWait = cutoff != Long.MAX_VALUE;
+      long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
+      long currentInProgress;
+      while (0 != (currentInProgress = actionsInProgress.get())) {
+        long now = 0;
+        if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
+          return false;
+        }
+        if (!hasWait) {
+          // Only log if wait is infinite.
+          now = EnvironmentEdgeManager.currentTimeMillis();
           if (now > lastLog + 10000) {
             lastLog = now;
             LOG.info("#" + id + ", waiting for " + currentInProgress + "  actions to finish");
           }
           synchronized (actionsInProgress) {
             if (actionsInProgress.get() == 0) break;
-            actionsInProgress.wait(100);
+            actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
           }
         }
-      } catch (InterruptedException iex) {
-        throw new InterruptedIOException(iex.getMessage());
       }
+      return true;
     }
 
     @Override
@@ -931,7 +1326,8 @@ class AsyncProcess {
     }
 
     @Override
-    public Object[] getResults() {
+    public Object[] getResults() throws InterruptedIOException {
+      waitUntilDone();
       return results;
     }
   }
@@ -1080,4 +1476,8 @@ class AsyncProcess {
     return new ConnectionManager.ServerErrorTracker(
         this.serverTrackerTimeout, this.numTries);
   }
+
+  private static boolean isReplicaGet(Row row) {
+    return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 0c4776d..779d15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -231,4 +232,9 @@ interface ClusterConnection extends HConnection {
    * @return Default AsyncProcess associated with this connection.
    */
   AsyncProcess getAsyncProcess();
+
+  /**
+   * @return All locations for a particular region.
+   */
+  RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index dfc6d00..3038fb2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -203,6 +204,11 @@ class ConnectionAdapter implements ClusterConnection {
   }
 
   @Override
+  public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+    return wrappedConnection.locateRegionAll(tableName, row);
+  }
+
+  @Override
   public void clearRegionCache() {
     wrappedConnection.clearRegionCache();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 15f78c5..d2b58ae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -946,10 +946,15 @@ class ConnectionManager {
     }
 
     @Override
-    public HRegionLocation locateRegion(final TableName tableName,
-        final byte [] row)
-    throws IOException{
-      RegionLocations locations = locateRegion(tableName, row, true, true);
+    public RegionLocations locateRegionAll(
+        final TableName tableName, final byte[] row) throws IOException{
+      return locateRegion(tableName, row, true, true);
+    }
+
+    @Override
+    public HRegionLocation locateRegion(
+        final TableName tableName, final byte[] row) throws IOException{
+      RegionLocations locations = locateRegionAll(tableName, row);
       return locations == null ? null : locations.getRegionLocation();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 2ca24dc..cad521a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -84,17 +84,6 @@ public final class MultiAction<R> {
     return actions.keySet();
   }
 
-  /**
-   * @return All actions from all regions in this container
-   */
-  public List<Action<R>> allActions() {
-    List<Action<R>> res = new ArrayList<Action<R>>();
-    for (List<Action<R>> lst : actions.values()) {
-      res.addAll(lst);
-    }
-    return res;
-  }
-
   public boolean hasNonceGroup() {
     return nonceGroup != HConstants.NO_NONCE;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index cc26ecf..20cf766 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -153,4 +154,9 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
     // Use the location we were given in the constructor rather than go look it up.
     setStub(getConnection().getClient(this.location.getServerName()));
   }
+
+  @VisibleForTesting
+  ServerName getServerName() {
+    return location.getServerName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
index abe9bf5..6b1465d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
@@ -30,7 +30,7 @@ public class RegionReplicaUtil {
   /**
    * The default replicaId for the region
    */
-  private static final int DEFAULT_REPLICA_ID = 0;
+  static final int DEFAULT_REPLICA_ID = 0;
 
   /**
    * Returns the HRegionInfo for the given replicaId. HRegionInfo's correspond to
@@ -62,4 +62,7 @@ public class RegionReplicaUtil {
     return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID);
   }
 
+  public static boolean isDefaultReplica(int replicaId) {
+    return DEFAULT_REPLICA_ID == replicaId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index edffd18..575827f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -43,8 +45,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
@@ -53,6 +58,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 @Category(MediumTests.class)
 public class TestAsyncProcess {
@@ -64,8 +70,9 @@ public class TestAsyncProcess {
   private static final byte[] FAILS = "FAILS".getBytes();
   private static final Configuration conf = new Configuration();
 
-  private static ServerName sn = ServerName.valueOf("localhost:10,1254");
-  private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
+  private static ServerName sn = ServerName.valueOf("s1:1,1");
+  private static ServerName sn2 = ServerName.valueOf("s2:2,2");
+  private static ServerName sn3 = ServerName.valueOf("s3:3,3");
   private static HRegionInfo hri1 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
   private static HRegionInfo hri2 =
@@ -76,6 +83,16 @@ public class TestAsyncProcess {
   private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
   private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
 
+  // Replica stuff
+  private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+      hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
+  private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+  private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+      new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
+  private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+      new HRegionLocation(hri2r1, sn3));
+  private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
 
@@ -139,6 +156,7 @@ public class TestAsyncProcess {
         ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
               new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
+        @Override
         public void execute(Runnable command) {
           throw new RejectedExecutionException("test under failure");
         }
@@ -158,7 +176,17 @@ public class TestAsyncProcess {
     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
       callsCt.incrementAndGet();
       final MultiResponse mr = createMultiResponse(
-          callable.getMulti(), nbMultiResponse, nbActions);
+          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+            @Override
+            public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
+              if (Arrays.equals(FAILS, a.getAction().getRow())) {
+                mr.add(regionName, a.getOriginalIndex(), failure);
+              } else {
+                mr.add(regionName, a.getOriginalIndex(), success);
+              }
+            }
+          });
+
       return new RpcRetryingCaller<MultiResponse>(100, 10) {
         @Override
         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
@@ -204,23 +232,106 @@ public class TestAsyncProcess {
     }
   }
 
-  static MultiResponse createMultiResponse(
-      final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
+  class MyAsyncProcessWithReplicas extends MyAsyncProcess {
+    private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
+    private long primarySleepMs = 0, replicaSleepMs = 0;
+    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
+    private final AtomicLong replicaCalls = new AtomicLong(0);
+
+    public void addFailures(HRegionInfo... hris) {
+      for (HRegionInfo hri : hris) {
+        failures.add(hri.getRegionName());
+      }
+    }
+
+    public long getReplicaCallCount() {
+      return replicaCalls.get();
+    }
+
+    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
+      customPrimarySleepMs.put(server, primaryMs);
+    }
+
+    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
+      super(hc, conf);
+    }
+
+    public void setCallDelays(long primaryMs, long replicaMs) {
+      this.primarySleepMs = primaryMs;
+      this.replicaSleepMs = replicaMs;
+    }
+
+    @Override
+    protected RpcRetryingCaller<MultiResponse> createCaller(
+        MultiServerCallable<Row> callable) {
+      final MultiResponse mr = createMultiResponse(
+          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+            @Override
+            public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
+              if (failures.contains(regionName)) {
+                mr.add(regionName, a.getOriginalIndex(), failure);
+              } else {
+                boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
+                mr.add(regionName, a.getOriginalIndex(),
+                    Result.create(new Cell[0], null, isStale));
+              }
+            }
+          });
+      // Currently AsyncProcess either sends all-replica, or all-primary request.
+      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
+          callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
+      final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
+      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
+          + callable.getMulti().actions.size() + " entries: ";
+      for (byte[] region : callable.getMulti().actions.keySet()) {
+        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
+      }
+      LOG.debug(debugMsg);
+      if (!isDefault) {
+        replicaCalls.incrementAndGet();
+      }
+
+      return new RpcRetryingCaller<MultiResponse>(100, 10) {
+        @Override
+        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+        throws IOException, RuntimeException {
+          long sleep = -1;
+          if (isDefault) {
+            Long customSleep = customPrimarySleepMs.get(server);
+            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
+          } else {
+            sleep = replicaSleepMs;
+          }
+          if (sleep != 0) {
+            try {
+              Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+            }
+          }
+          return mr;
+        }
+      };
+    }
+  }
+
+  static MultiResponse createMultiResponse(final MultiAction<Row> multi,
+      AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
     final MultiResponse mr = new MultiResponse();
     nbMultiResponse.incrementAndGet();
     for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
       byte[] regionName = entry.getKey();
       for (Action<Row> a : entry.getValue()) {
         nbActions.incrementAndGet();
-        if (Arrays.equals(FAILS, a.getAction().getRow())) {
-          mr.add(regionName, a.getOriginalIndex(), failure);
-        } else {
-          mr.add(regionName, a.getOriginalIndex(), success);
-        }
+        gen.addResponse(mr, regionName, a);
       }
     }
     return mr;
   }
+
+  private static interface ResponseGenerator {
+    void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
+  }
+
   /**
    * Returns our async process.
    */
@@ -233,9 +344,8 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public HRegionLocation locateRegion(final TableName tableName,
-                                        final byte[] row) {
-      return loc1;
+    public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+      return new RegionLocations(loc1);
     }
   }
 
@@ -253,18 +363,18 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public HRegionLocation locateRegion(final TableName tableName,
-                                        final byte[] row) {
+    public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
       int i = 0;
-      for (HRegionLocation hr:hrl){
-        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
-            usedRegions[i] = true;
-          return hr;
+      for (HRegionLocation hr : hrl){
+        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
+          usedRegions[i] = true;
+          return new RegionLocations(hr);
         }
         i++;
       }
       return null;
     }
+
   }
 
   @Test
@@ -284,6 +394,7 @@ public class TestAsyncProcess {
     ClusterConnection hc = createHConnection();
     final AtomicInteger updateCalled = new AtomicInteger(0);
     Batch.Callback<Object> cb = new Batch.Callback<Object>() {
+      @Override
       public void update(byte[] region, byte[] row, Object result) {
         updateCalled.incrementAndGet();
       }
@@ -458,6 +569,7 @@ public class TestAsyncProcess {
     final Thread myThread = Thread.currentThread();
 
     Thread t = new Thread() {
+      @Override
       public void run() {
         Threads.sleep(2000);
         myThread.interrupt();
@@ -478,6 +590,7 @@ public class TestAsyncProcess {
     final long sleepTime = 2000;
 
     Thread t2 = new Thread() {
+      @Override
       public void run() {
         Threads.sleep(sleepTime);
         while (ap.tasksInProgress.get() > 0) {
@@ -496,32 +609,33 @@ public class TestAsyncProcess {
   }
 
   private static ClusterConnection createHConnection() throws IOException {
-    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
-
-    Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
-    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1);
-
-    Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2);
-    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
+    ClusterConnection hc = createHConnectionCommon();
+    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
+    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
+    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
+    setMockLocation(hc, FAILS, new RegionLocations(loc2));
+    return hc;
+  }
 
-    Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
-    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
+  private static ClusterConnection createHConnectionWithReplicas() throws IOException {
+    ClusterConnection hc = createHConnectionCommon();
+    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
+    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
+    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
+    return hc;
+  }
 
-    Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
-    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
-        Mockito.eq(FAILS))).thenReturn(loc2);
+  private static void setMockLocation(ClusterConnection hc, byte[] row,
+      RegionLocations result) throws IOException {
+    Mockito.when(hc.locateRegionAll(
+        Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result);
+  }
 
+  private static ClusterConnection createHConnectionCommon() {
+    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
     Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
     Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
-
     return hc;
   }
 
@@ -756,7 +870,124 @@ public class TestAsyncProcess {
     Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
   }
 
-  private void verifyResult(AsyncRequestFuture ars, boolean... expected) {
+  @Test
+  public void testReplicaReplicaSuccess() throws Exception {
+    // Main call takes too long so replicas succeed, except for one region w/o replicas.
+    // One region has no replica, so the main call succeeds for it.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
+    Assert.assertEquals(2, ap.getReplicaCallCount());
+  }
+
+  @Test
+  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
+    // Main call succeeds before replica calls are kicked off.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
+    Assert.assertEquals(0, ap.getReplicaCallCount());
+  }
+
+  @Test
+  public void testReplicaParallelCallsSucceed() throws Exception {
+    // Either main or replica can succeed.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
+    long replicaCalls = ap.getReplicaCallCount();
+    Assert.assertTrue(replicaCalls >= 0);
+    Assert.assertTrue(replicaCalls <= 2);
+  }
+
+  @Test
+  public void testReplicaPartialReplicaCall() throws Exception {
+    // One server is slow, so the result for its region comes from replica, whereas
+    // the result for other region comes from primary before replica calls happen.
+    // There should be no replica call for that region at all.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
+    ap.setPrimaryCallDelay(sn2, 2000);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
+    Assert.assertEquals(1, ap.getReplicaCallCount());
+  }
+
+  @Test
+  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
+    // Main calls fail before replica calls can start - this is currently not handled.
+    // It would probably never happen if we can get location (due to retries),
+    // and it would require additional synchronization.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
+    ap.addFailures(hri1, hri2);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
+    Assert.assertEquals(0, ap.getReplicaCallCount());
+  }
+
+  @Test
+  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
+    // Main calls fails after replica calls start. For two-replica region, one replica call
+    // also fails. Regardless, we get replica results for both regions.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
+    ap.addFailures(hri1, hri1r2, hri2);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
+    Assert.assertEquals(2, ap.getReplicaCallCount());
+  }
+
+  @Test
+  public void testReplicaAllCallsFailForOneRegion() throws Exception {
+    // For one of the region, all 3, main and replica, calls fail. For the other, replica
+    // call fails but its exception should not be visible as it did succeed.
+    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
+    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
+    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+    AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
+    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
+    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
+    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
+      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
+    }
+  }
+
+  private MyAsyncProcessWithReplicas createReplicaAp(
+      int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
+    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
+  }
+
+  private MyAsyncProcessWithReplicas createReplicaAp(
+      int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
+    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
+    //       that the replica call has happened and that way control the ordering.
+    Configuration conf = new Configuration();
+    ClusterConnection conn = createHConnectionWithReplicas();
+    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs);
+    if (retries > 0) {
+      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+    }
+    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
+    ap.setCallDelays(primaryMs, replicaMs);
+    return ap;
+  }
+
+  private static List<Get> makeTimelineGets(byte[]... rows) {
+    List<Get> result = new ArrayList<Get>();
+    for (byte[] row : rows) {
+      Get get = new Get(row);
+      get.setConsistency(Consistency.TIMELINE);
+      result.add(get);
+    }
+    return result;
+  }
+
+  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
     Object[] actual = ars.getResults();
     Assert.assertEquals(expected.length, actual.length);
     for (int i = 0; i < expected.length; ++i) {
@@ -764,6 +995,27 @@ public class TestAsyncProcess {
     }
   }
 
+  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
+  private enum RR {
+    TRUE,
+    FALSE,
+    DONT_CARE,
+    FAILED
+  }
+
+  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
+    Object[] actuals = ars.getResults();
+    Assert.assertEquals(expecteds.length, actuals.length);
+    for (int i = 0; i < expecteds.length; ++i) {
+      Object actual = actuals[i];
+      RR expected = expecteds[i];
+      Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable);
+      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
+        Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
+      }
+    }
+  }
+
   /**
    * @param regCnt  the region: 1 to 3.
    * @param success if true, the put will succeed.

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
index cc0901f..e2be188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
@@ -28,12 +28,10 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class CoprocessorHConnection implements ClusterConnection {
+class CoprocessorHConnection implements ClusterConnection {
   private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator();
 
   /**
@@ -60,7 +58,7 @@ public class CoprocessorHConnection implements ClusterConnection {
    * @return an unmanaged {@link HConnection}.
    * @throws IOException if we cannot create the basic connection
    */
-  public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
+  static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
       throws IOException {
     ClusterConnection connection =
         ConnectionManager.createConnectionInternal(env.getConfiguration());
@@ -427,4 +425,9 @@ public class CoprocessorHConnection implements ClusterConnection {
   public AsyncProcess getAsyncProcess() {
     return delegate.getAsyncProcess();
   }
+
+  @Override
+  public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+    return delegate.locateRegionAll(tableName, row);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 5a86ab5..f925fea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -110,6 +111,8 @@ public class HConnectionTestingUtility {
       thenReturn(loc);
     Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
       thenReturn(loc);
+    Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())).
+      thenReturn(new RegionLocations(loc));
     if (admin != null) {
       // If a call to getAdmin, return this implementation.
       Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).