You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:01 UTC
[15/49] git commit: HBASE-10356 Failover RPC's for multi-get
HBASE-10356 Failover RPC's for multi-get
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569559 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25b6103d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25b6103d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25b6103d
Branch: refs/heads/master
Commit: 25b6103dadba16e85db0a8c5f2fc44ecf9fc3f2a
Parents: d6f603a
Author: sershe <se...@unknown>
Authored: Tue Feb 18 23:37:17 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:37 2014 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HRegionInfo.java | 2 +-
.../apache/hadoop/hbase/RegionLocations.java | 5 +
.../org/apache/hadoop/hbase/client/Action.java | 23 +-
.../hadoop/hbase/client/AsyncProcess.java | 580 ++++++++++++++++---
.../hadoop/hbase/client/ClusterConnection.java | 6 +
.../hadoop/hbase/client/ConnectionAdapter.java | 6 +
.../hadoop/hbase/client/ConnectionManager.java | 13 +-
.../apache/hadoop/hbase/client/MultiAction.java | 11 -
.../hbase/client/MultiServerCallable.java | 6 +
.../hadoop/hbase/client/RegionReplicaUtil.java | 5 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 332 +++++++++--
.../hbase/client/CoprocessorHConnection.java | 13 +-
.../hbase/client/HConnectionTestingUtility.java | 3 +
13 files changed, 852 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 0f846b5..59a3248 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -143,7 +143,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
public static final byte REPLICA_ID_DELIMITER = (byte)'_';
private static final int MAX_REPLICA_ID = 0xFFFF;
- private static final int DEFAULT_REPLICA_ID = 0;
+ static final int DEFAULT_REPLICA_ID = 0;
/**
* Does region name contain its encoded name?
* @param regionName region name
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index cdf1180..b5db549 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -261,6 +262,10 @@ public class RegionLocations {
return locations;
}
+ public HRegionLocation getDefaultRegionLocation() {
+ return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+ }
+
/**
* Returns the first not-null region location in the list
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index c3e2104..5147c25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
/**
* A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by
@@ -27,18 +28,34 @@ import org.apache.hadoop.hbase.HConstants;
* the index from the original request.
*/
@InterfaceAudience.Private
+//TODO: R is never used
public class Action<R> implements Comparable<R> {
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
private long nonce = HConstants.NO_NONCE;
+ private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
public Action(Row action, int originalIndex) {
super();
this.action = action;
- this.originalIndex = originalIndex;
+ this.originalIndex = originalIndex;
}
+ /**
+ * Creates an action for a particular replica from original action.
+ * @param action Original action.
+ * @param replicaId Replica id for the new action.
+ */
+ public Action(Action<R> action, int replicaId) {
+ super();
+ this.action = action.action;
+ this.nonce = action.nonce;
+ this.originalIndex = action.originalIndex;
+ this.replicaId = replicaId;
+ }
+
+
public void setNonce(long nonce) {
this.nonce = nonce;
}
@@ -55,6 +72,10 @@ public class Action<R> implements Comparable<R> {
return originalIndex;
}
+ public int getReplicaId() {
+ return replicaId;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 714daeb..9419932 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace;
-
import com.google.common.annotations.VisibleForTesting;
/**
@@ -89,9 +89,11 @@ import com.google.common.annotations.VisibleForTesting;
* </p>
*/
class AsyncProcess {
- private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+ protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final AtomicLong COUNTER = new AtomicLong();
+ public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
+
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@@ -102,7 +104,7 @@ class AsyncProcess {
public boolean hasError();
public RetriesExhaustedWithDetailsException getErrors();
public List<? extends Row> getFailedOperations();
- public Object[] getResults();
+ public Object[] getResults() throws InterruptedIOException;
/** Wait until all tasks are executed, successfully or not. */
public void waitUntilDone() throws InterruptedIOException;
}
@@ -122,6 +124,27 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException {}
};
+ /** Sync point for calls to multiple replicas for the same user request (Get).
+ * Created and put in the results array (we assume replica calls require results) when
+ * the replica calls are launched. See results for details of this process.
+ * POJO, all fields are public. To modify them, the object itself is locked. */
+ private static class ReplicaResultState {
+ public ReplicaResultState(int callCount) {
+ this.callCount = callCount;
+ }
+
+ /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
+ int callCount;
+ /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
+ * is not last, adds error to list. If all calls fail the last one sets this to list. */
+ Object result = null;
+ /** Errors for which it is not decided whether we will report them to user. If one of the
+ * calls succeeds, we will discard the errors that may have happened in the other calls. */
+ BatchErrors replicaErrors = null;
+ }
+
+
+ // TODO: many of the fields should be made private
protected final long id;
protected final ClusterConnection hConnection;
@@ -160,6 +183,7 @@ class AsyncProcess {
protected int numTries;
protected int serverTrackerTimeout;
protected int timeout;
+ protected long primaryCallTimeout;
// End configuration settings.
protected static class BatchErrors {
@@ -192,6 +216,12 @@ class AsyncProcess {
actions.clear();
addresses.clear();
}
+
+ public synchronized void merge(BatchErrors other) {
+ throwables.addAll(other.throwables);
+ actions.addAll(other.actions);
+ addresses.addAll(other.addresses);
+ }
}
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
@@ -212,6 +242,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@@ -270,7 +301,8 @@ class AsyncProcess {
/**
* Extract from the rows list what we can submit. The rows we can not submit are kept in the
- * list.
+ * list. Does not send requests to replicas (not currently used for anything other
+ * than streaming puts anyway).
*
* @param pool ExecutorService to use.
* @param tableName The table for which this request is needed.
@@ -311,7 +343,7 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc;
try {
- loc = findDestLocation(tableName, r);
+ loc = findDestLocation(tableName, r, true).getDefaultRegionLocation();
} catch (IOException ex) {
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
@@ -329,7 +361,9 @@ class AsyncProcess {
Action<Row> action = new Action<Row>(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
- addAction(loc, action, actionsByServer, nonceGroup);
+ // TODO: replica-get is not supported on this path
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
@@ -347,7 +381,7 @@ class AsyncProcess {
ars.manageError(originalIndex, row, false, locationErrors.get(i), null);
}
}
- ars.sendMultiAction(actionsByServer, 1);
+ ars.sendMultiAction(actionsByServer, 1, null);
return ars;
}
@@ -359,13 +393,12 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
*/
- private void addAction(HRegionLocation loc, Action<Row> action,
+ private void addAction(ServerName server, byte[] regionName, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
- final byte[] regionName = loc.getRegionInfo().getRegionName();
- MultiAction<Row> multiAction = actionsByServer.get(loc.getServerName());
+ MultiAction<Row> multiAction = actionsByServer.get(server);
if (multiAction == null) {
multiAction = new MultiAction<Row>();
- actionsByServer.put(loc.getServerName(), multiAction);
+ actionsByServer.put(server, multiAction);
}
if (action.hasNonce() && !multiAction.hasNonceGroup()) {
multiAction.setNonceGroup(nonceGroup);
@@ -380,10 +413,12 @@ class AsyncProcess {
* @param row the row
* @return the destination.
*/
- private HRegionLocation findDestLocation(TableName tableName, Row row) throws IOException {
+ private RegionLocations findDestLocation(
+ TableName tableName, Row row, boolean checkPrimary) throws IOException {
if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
- HRegionLocation loc = hConnection.locateRegion(tableName, row.getRow());
- if (loc == null) {
+ RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
+ if (loc == null
+ || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) {
throw new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
}
@@ -516,6 +551,144 @@ class AsyncProcess {
* scheduling children. This is why lots of code doesn't require any synchronization.
*/
protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
+
+ /**
+ * Runnable (that can be submitted to thread pool) that waits for when it's time
+ * to issue replica calls, finds region replicas, groups the requests by replica and
+ * issues the calls (on separate threads, via sendMultiAction).
+ * This is done on a separate thread because we don't want to wait on user thread for
+ * our asynchronous call, and usually we have to wait before making replica calls.
+ */
+ private final class ReplicaCallIssuingRunnable implements Runnable {
+ private final long startTime;
+ private final List<Action<Row>> initialActions;
+
+ public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
+ this.initialActions = initialActions;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public void run() {
+ boolean done = false;
+ if (primaryCallTimeout > 0) {
+ try {
+ done = waitUntilDone(startTime + primaryCallTimeout);
+ } catch (InterruptedException ex) {
+ LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
+ return;
+ }
+ }
+ if (done) return; // Done within primary timeout
+ Map<ServerName, MultiAction<Row>> actionsByServer =
+ new HashMap<ServerName, MultiAction<Row>>();
+ if (replicaGetIndices == null) {
+ for (int i = 0; i < results.length; ++i) {
+ addReplicaActions(i, actionsByServer);
+ }
+ } else {
+ for (int i = 0; i < replicaGetIndices.length; ++i) {
+ addReplicaActions(replicaGetIndices[i], actionsByServer);
+ }
+ }
+ if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
+ sendMultiAction(actionsByServer, 1, null);
+ }
+
+ /**
+ * Add replica actions to action map by server.
+ * @param index Index of the original action.
+ * @param actionsByServer The map by server to add it to.
+ */
+ private void addReplicaActions(
+ int index, Map<ServerName, MultiAction<Row>> actionsByServer) {
+ if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
+ Action<Row> action = initialActions.get(index);
+ RegionLocations loc = null;
+ try {
+ // For perf, we assume that this location coming from cache, since we just got location
+ // from meta for the primary call. If it turns out to not be the case, we'd need local
+ // cache since we want to keep as little time as possible before replica call.
+ loc = findDestLocation(tableName, action.getAction(), false);
+ } catch (IOException ex) {
+ manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
+ LOG.error("Cannot get location - no replica calls for some actions", ex);
+ return;
+ }
+ HRegionLocation[] locs = loc.getRegionLocations();
+ int replicaCount = 0;
+ for (int i = 1; i < locs.length; ++i) {
+ replicaCount += (locs[i] != null) ? 1 : 0;
+ }
+ if (replicaCount == 0) {
+ LOG.warn("No replicas found for " + action.getAction());
+ return;
+ }
+ synchronized (replicaResultLock) {
+ // Don't run replica calls if the original has finished. We could do it e.g. if
+ // original has already failed before first replica call (unlikely given retries),
+ // but that would require additional synchronization w.r.t. returning to caller.
+ if (results[index] != null) return;
+ // We set the number of calls here. After that any path must call setResult/setError.
+ results[index] = new ReplicaResultState(replicaCount + 1);
+ }
+ for (int i = 1; i < locs.length; ++i) {
+ if (locs[i] == null) continue;
+ addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
+ new Action<Row>(action, i), actionsByServer, nonceGroup);
+ }
+ }
+ }
+
+ /**
+ * Runnable (that can be submitted to thread pool) that submits MultiAction to a
+ * single server. The server call is synchronous, therefore we do it on a thread pool.
+ */
+ private final class SingleServerRequestRunnable implements Runnable {
+ private final MultiAction<Row> multiAction;
+ private final int numAttempt;
+ private final ServerName server;
+
+ private SingleServerRequestRunnable(
+ MultiAction<Row> multiAction, int numAttempt, ServerName server) {
+ this.multiAction = multiAction;
+ this.numAttempt = numAttempt;
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ MultiResponse res;
+ try {
+ MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
+ try {
+ res = createCaller(callable).callWithoutRetries(callable, timeout);
+ } catch (IOException e) {
+ // The service itself failed . It may be an error coming from the communication
+ // layer, but, as well, a functional error raised by the server.
+ receiveGlobalFailure(multiAction, server, numAttempt, e);
+ return;
+ } catch (Throwable t) {
+ // This should not happen. Let's log & retry anyway.
+ LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
+ " Retrying. Server is " + server + ", tableName=" + tableName, t);
+ receiveGlobalFailure(multiAction, server, numAttempt, t);
+ return;
+ }
+
+ // Normal case: we received an answer from the server, and it's not an exception.
+ receiveMultiAction(multiAction, server, res, numAttempt);
+ } catch (Throwable t) {
+ // Something really bad happened. We are on the send thread that will now die.
+ LOG.error("Internal AsyncProcess #" + id + " error for "
+ + tableName + " processing for " + server, t);
+ throw new RuntimeException(t);
+ } finally {
+ decTaskCounters(multiAction.getRegions(), server);
+ }
+ }
+ }
+
private final Batch.Callback<CResult> callback;
private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer;
@@ -524,7 +697,21 @@ class AsyncProcess {
private final TableName tableName;
private final AtomicLong actionsInProgress = new AtomicLong(-1);
+ /** The lock controls access to results. It is only held when populating results where
+ * there might be several callers (eventual consistency gets). For other requests,
+ * there's one unique call going on per result index. */
+ private final Object replicaResultLock = new Object();
+ /** Result array. Null if results are not needed. Otherwise, each index corresponds to
+ * the action index in initial actions submitted. For most request types, has null-s for
+ * requests that are not done, and result/exception for those that are done.
+ * For eventual-consistency gets, initially the same applies; at some point, replica calls
+ * might be started, and ReplicaResultState is put at the corresponding indices. The
+ * returning calls check the type to detect when this is the case. After all calls are done,
+ * ReplicaResultState-s are replaced with results for the user. */
private final Object[] results;
+ /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
+ private final int[] replicaGetIndices;
+ private final boolean hasAnyReplicaGets;
private final long nonceGroup;
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
@@ -545,6 +732,51 @@ class AsyncProcess {
} else {
this.results = needResults ? new Object[actions.size()] : null;
}
+ List<Integer> replicaGetIndices = null;
+ boolean hasAnyReplicaGets = false;
+ if (needResults) {
+ // Check to see if any requests might require replica calls.
+ // We expect that many requests will consist of all or no multi-replica gets; in such
+ // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
+ // store the list of action indexes for which replica gets are possible, and set
+ // hasAnyReplicaGets to true.
+ boolean hasAnyNonReplicaReqs = false;
+ int posInList = 0;
+ for (Action<Row> action : actions) {
+ boolean isReplicaGet = isReplicaGet(action.getAction());
+ if (isReplicaGet) {
+ hasAnyReplicaGets = true;
+ if (hasAnyNonReplicaReqs) { // Mixed case
+ if (replicaGetIndices == null) {
+ replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
+ }
+ replicaGetIndices.add(posInList);
+ }
+ } else if (!hasAnyNonReplicaReqs) {
+ // The first non-multi-replica request in the action list.
+ hasAnyNonReplicaReqs = true;
+ if (posInList > 0) {
+ // Add all the previous requests to the index lists. We know they are all
+ // replica-gets because this is the first non-multi-replica request in the list.
+ replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
+ for (int i = 0; i < posInList; ++i) {
+ replicaGetIndices.add(i);
+ }
+ }
+ }
+ ++posInList;
+ }
+ }
+ this.hasAnyReplicaGets = hasAnyReplicaGets;
+ if (replicaGetIndices != null) {
+ this.replicaGetIndices = new int[replicaGetIndices.size()];
+ int i = 0;
+ for (Integer el : replicaGetIndices) {
+ this.replicaGetIndices[i++] = el;
+ }
+ } else {
+ this.replicaGetIndices = null;
+ }
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
}
@@ -560,21 +792,40 @@ class AsyncProcess {
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
- HRegionLocation loc;
+ boolean isReplica = false;
for (Action<Row> action : currentActions) {
+ RegionLocations locs = null;
try {
- loc = findDestLocation(tableName, action.getAction());
+ locs = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
continue;
}
- addAction(loc, action, actionsByServer, nonceGroup);
- }
+ boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ if (isReplica && !isReplicaAction) {
+ // This is the property of the current implementation, not a requirement.
+ throw new AssertionError("Replica and non-replica actions in the same retry");
+ }
+ isReplica = isReplicaAction;
+ HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
+ if (loc == null || loc.getServerName() == null) {
+ // On retry, we couldn't find location for some replica we saw before.
+ String str = "Cannot find location for replica " + action.getReplicaId();
+ LOG.error(str);
+ manageError(action.getOriginalIndex(), action.getAction(),
+ false, new IOException(str), null);
+ continue;
+ }
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
+ }
+ // If this is a first attempt to group and send, no replicas, we need replica thread.
if (!actionsByServer.isEmpty()) {
- sendMultiAction(actionsByServer, numAttempt);
+ boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
+ sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null);
}
}
@@ -584,51 +835,22 @@ class AsyncProcess {
*
* @param actionsByServer the actions structured by regions
* @param numAttempt the attempt number.
+ * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/
- private void sendMultiAction(
- Map<ServerName, MultiAction<Row>> actionsByServer, final int numAttempt) {
+ private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
+ int numAttempt, List<Action<Row>> actionsForReplicaThread) {
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
- int reuseThreadCountdown = (numAttempt > 1) ? actionsByServer.size() : Integer.MAX_VALUE;
+ int actionsRemaining = actionsByServer.size();
+ // This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
final ServerName server = e.getKey();
final MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server);
- Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
- @Override
- public void run() {
- MultiResponse res;
- try {
- MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
- try {
- res = createCaller(callable).callWithoutRetries(callable, timeout);
- } catch (IOException e) {
- // The service itself failed . It may be an error coming from the communication
- // layer, but, as well, a functional error raised by the server.
- receiveGlobalFailure(multiAction, server, numAttempt, e);
- return;
- } catch (Throwable t) {
- // This should not happen. Let's log & retry anyway.
- LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
- " Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t);
- receiveGlobalFailure(multiAction, server, numAttempt, t);
- return;
- }
-
- // Normal case: we received an answer from the server, and it's not an exception.
- receiveMultiAction(multiAction, server, res, numAttempt);
- } catch (Throwable t) {
- // Something really bad happened. We are on the send thread that will now die.
- LOG.error("Internal AsyncProcess #" + id + " error for "
- + tableName + " processing for " + server, t);
- throw new RuntimeException(t);
- } finally {
- decTaskCounters(multiAction.getRegions(), server);
- }
- }
- });
- --reuseThreadCountdown;
- if (reuseThreadCountdown == 0) {
+ Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
+ new SingleServerRequestRunnable(multiAction, numAttempt, server));
+ --actionsRemaining;
+ if ((numAttempt > 1) && actionsRemaining == 0) {
runnable.run();
} else {
try {
@@ -645,6 +867,30 @@ class AsyncProcess {
}
}
}
+ if (actionsForReplicaThread != null) {
+ startWaitingForReplicaCalls(actionsForReplicaThread);
+ }
+ }
+
+ /**
+ * Starts waiting to issue replica calls on a different thread; or issues them immediately.
+ */
+ private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
+ actionsForReplicaThread, startTime);
+ if (primaryCallTimeout == 0) {
+ // Start replica calls immediately.
+ replicaRunnable.run();
+ } else {
+ // Start the thread that may kick off replica gets.
+ // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
+ try {
+ pool.submit(replicaRunnable);
+ } catch (RejectedExecutionException ree) {
+ LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
+ }
+ }
}
/**
@@ -665,11 +911,11 @@ class AsyncProcess {
if (!canRetry) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this.
- errors.add(throwable, row, server);
- if (results != null) {
- setResult(originalIndex, row, throwable);
- }
- decActionCounter();
+ setError(originalIndex, row, throwable, server);
+ } else {
+ // See if we are dealing with a replica action that was completed from other server.
+ // Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
+ canRetry = !isActionComplete(originalIndex, row);
}
return canRetry;
@@ -685,15 +931,17 @@ class AsyncProcess {
*/
private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
- // Do not use the exception for updating cache because it might be coming from
- // any of the regions in the MultiAction.
- byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
- hConnection.updateCachedLocations(tableName, null, row, null, server);
errorsByServer.reportServerError(server);
boolean canRetry = errorsByServer.canRetryMore(numAttempt);
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ byte[] row = e.getValue().iterator().next().getAction().getRow();
+ // Do not use the exception for updating cache because it might be coming from
+ // any of the regions in the MultiAction.
+ // TODO: depending on type of exception we might not want to update cache at all?
+ hConnection.updateCachedLocations(tableName, regionName, row, null, server);
for (Action<Row> action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
toReplay.add(action);
@@ -791,14 +1039,16 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
- if (!regionFailureRegistered) { // We're doing this once per location.
+ // Register corresponding failures once per server/once per region.
+ if (!regionFailureRegistered) {
regionFailureRegistered = true;
- // The location here is used as a server name.
- hConnection.updateCachedLocations(tableName, regionName, row.getRow(), result, server);
- if (failureCount == 0) {
- errorsByServer.reportServerError(server);
- canRetry = errorsByServer.canRetryMore(numAttempt);
- }
+ hConnection.updateCachedLocations(
+ tableName, regionName, row.getRow(), result, server);
+ }
+ if (failureCount == 0) {
+ errorsByServer.reportServerError(server);
+ // We determine canRetry only once for all calls, after reporting server failure.
+ canRetry = errorsByServer.canRetryMore(numAttempt);
}
++failureCount;
if (manageError(
@@ -809,16 +1059,14 @@ class AsyncProcess {
if (callback != null) {
try {
//noinspection unchecked
+ // TODO: would callback expect a replica region name if it gets one?
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
- if (results != null) {
- setResult(sentAction.getOriginalIndex(), sentAction.getAction(), result);
- }
- decActionCounter();
+ setResult(sentAction, result);
}
}
}
@@ -881,38 +1129,185 @@ class AsyncProcess {
return sb.toString();
}
- private void setResult(int index, Row row, Object result) {
- if (result == null) throw new RuntimeException("Result cannot be set to null");
- if (results[index] != null) throw new RuntimeException("Result was already set");
- results[index] = result;
+ /**
+ * Sets the non-error result from a particular action.
+ * @param action Action (request) that the server responded to.
+ * @param result The result.
+ */
+ private void setResult(Action<Row> action, Object result) {
+ ReplicaResultState state = null;
+ boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ if (results == null || ((state = trySetResultSimple(
+ action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
+ decActionCounter();
+ return; // Simple case, no replica requests.
+ }
+ synchronized (state) {
+ if (state.callCount == 0) return; // someone already set the result
+ state.result = result;
+ state.callCount = 0;
+ state.replicaErrors = null; // no longer matters
+ }
+ decActionCounter();
+ }
+
+ /**
+ * Sets the error from a particular action.
+ * @param index Original action index.
+ * @param row Original request.
+ * @param throwable The resulting error.
+ * @param server The source server.
+ */
+ private void setError(int index, Row row, Throwable throwable, ServerName server) {
+ ReplicaResultState state = null;
+ if (results == null
+ || ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
+ errors.add(throwable, row, server);
+ decActionCounter();
+ return; // Simple case, no replica requests.
+ }
+ BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
+ boolean isActionDone = false;
+ synchronized (state) {
+ switch (state.callCount) {
+ case 0: return; // someone already set the result
+ case 1: { // All calls failed, we are the last error.
+ state.result = throwable;
+ target = errors;
+ isActionDone = true;
+ break;
+ }
+ default: {
+ assert state.callCount > 1;
+ if (state.replicaErrors == null) {
+ state.replicaErrors = new BatchErrors();
+ }
+ target = state.replicaErrors;
+ break;
+ }
+ }
+ --state.callCount;
+ }
+ target.add(throwable, row, server);
+ if (!isActionDone) return;
+ if (state.replicaErrors != null) { // last call, no need to lock
+ errors.merge(state.replicaErrors);
+ state.replicaErrors = null;
+ }
+ decActionCounter();
+ }
+
+ /**
+ * Checks if the action is complete; used on error to prevent needless retries.
+ * Does not synchronize, assuming element index/field accesses are atomic.
+ * This is an opportunistic optimization check, doesn't have to be strict.
+ * @param index Original action index.
+ * @param row Original request.
+ */
+ private boolean isActionComplete(int index, Row row) {
+ if (!isReplicaGet(row)) return false;
+ Object resObj = results[index];
+ return (resObj != null) && (!(resObj instanceof ReplicaResultState)
+ || ((ReplicaResultState)resObj).callCount == 0);
+ }
+
+ /**
+ * Tries to set the result or error for a particular action as if there were no replica calls.
+ * @return null if successful; replica state if there were in fact replica calls.
+ */
+ private ReplicaResultState trySetResultSimple(
+ int index, Row row, Object result, boolean isFromReplica) {
+ Object resObj = null;
+ if (!isReplicaGet(row)) {
+ if (isFromReplica) {
+ throw new AssertionError("Unexpected stale result for " + row);
+ }
+ results[index] = result;
+ } else {
+ synchronized (replicaResultLock) {
+ if ((resObj = results[index]) == null) {
+ if (isFromReplica) {
+ throw new AssertionError("Unexpected stale result for " + row);
+ }
+ results[index] = result;
+ }
+ }
+ }
+ return (resObj == null || !(resObj instanceof ReplicaResultState))
+ ? null : (ReplicaResultState)resObj;
}
private void decActionCounter() {
- actionsInProgress.decrementAndGet();
+ if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
+ // Convert replica sync structures to results.
+ int staleCount = 0;
+ if (replicaGetIndices == null) {
+ for (int i = 0; i < results.length; ++i) {
+ staleCount += convertReplicaResult(i) ? 1 : 0;
+ }
+ } else {
+ for (int i = 0; i < replicaGetIndices.length; ++i) {
+ staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
+ }
+ }
+ if (!actionsInProgress.compareAndSet(1, 0)) {
+ throw new AssertionError("Cannot set actions in progress to 0");
+ }
+ if (staleCount > 0) {
+ LOG.trace("Returning " + staleCount + " stale results");
+ }
+ } else {
+ actionsInProgress.decrementAndGet();
+ }
synchronized (actionsInProgress) {
actionsInProgress.notifyAll();
}
}
+ private boolean convertReplicaResult(int index) {
+ if (!(results[index] instanceof ReplicaResultState)) return false;
+ ReplicaResultState state = (ReplicaResultState)results[index];
+ // We know that noone will touch state with 0 callCount, no need to lock
+ if (state.callCount != 0) {
+ throw new AssertionError("Actions are done but callcount is " + state.callCount);
+ }
+ // TODO: we expect the Result coming from server to already have "isStale" specified.
+ Object res = results[index] = state.result;
+ return (res instanceof Result) && ((Result)res).isStale();
+ }
+
@Override
public void waitUntilDone() throws InterruptedIOException {
- long lastLog = EnvironmentEdgeManager.currentTimeMillis();
- long currentInProgress;
try {
- while (0 != (currentInProgress = actionsInProgress.get())) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
+ waitUntilDone(Long.MAX_VALUE);
+ } catch (InterruptedException iex) {
+ throw new InterruptedIOException(iex.getMessage());
+ }
+ }
+
+ private boolean waitUntilDone(long cutoff) throws InterruptedException {
+ boolean hasWait = cutoff != Long.MAX_VALUE;
+ long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
+ long currentInProgress;
+ while (0 != (currentInProgress = actionsInProgress.get())) {
+ long now = 0;
+ if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
+ return false;
+ }
+ if (!hasWait) {
+ // Only log if wait is infinite.
+ now = EnvironmentEdgeManager.currentTimeMillis();
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
}
synchronized (actionsInProgress) {
if (actionsInProgress.get() == 0) break;
- actionsInProgress.wait(100);
+ actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
}
}
- } catch (InterruptedException iex) {
- throw new InterruptedIOException(iex.getMessage());
}
+ return true;
}
@Override
@@ -931,7 +1326,8 @@ class AsyncProcess {
}
@Override
- public Object[] getResults() {
+ public Object[] getResults() throws InterruptedIOException {
+ waitUntilDone();
return results;
}
}
@@ -1080,4 +1476,8 @@ class AsyncProcess {
return new ConnectionManager.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
+
+ private static boolean isReplicaGet(Row row) {
+ return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 0c4776d..779d15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -231,4 +232,9 @@ interface ClusterConnection extends HConnection {
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
+
+ /**
+ * @return All locations for a particular region.
+ */
+ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index dfc6d00..3038fb2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -203,6 +204,11 @@ class ConnectionAdapter implements ClusterConnection {
}
@Override
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+ return wrappedConnection.locateRegionAll(tableName, row);
+ }
+
+ @Override
public void clearRegionCache() {
wrappedConnection.clearRegionCache();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 15f78c5..d2b58ae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -946,10 +946,15 @@ class ConnectionManager {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte [] row)
- throws IOException{
- RegionLocations locations = locateRegion(tableName, row, true, true);
+ public RegionLocations locateRegionAll(
+ final TableName tableName, final byte[] row) throws IOException{
+ return locateRegion(tableName, row, true, true);
+ }
+
+ @Override
+ public HRegionLocation locateRegion(
+ final TableName tableName, final byte[] row) throws IOException{
+ RegionLocations locations = locateRegionAll(tableName, row);
return locations == null ? null : locations.getRegionLocation();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 2ca24dc..cad521a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -84,17 +84,6 @@ public final class MultiAction<R> {
return actions.keySet();
}
- /**
- * @return All actions from all regions in this container
- */
- public List<Action<R>> allActions() {
- List<Action<R>> res = new ArrayList<Action<R>>();
- for (List<Action<R>> lst : actions.values()) {
- res.addAll(lst);
- }
- return res;
- }
-
public boolean hasNonceGroup() {
return nonceGroup != HConstants.NO_NONCE;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index cc26ecf..20cf766 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
/**
@@ -153,4 +154,9 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(this.location.getServerName()));
}
+
+ @VisibleForTesting
+ ServerName getServerName() {
+ return location.getServerName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
index abe9bf5..6b1465d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
@@ -30,7 +30,7 @@ public class RegionReplicaUtil {
/**
* The default replicaId for the region
*/
- private static final int DEFAULT_REPLICA_ID = 0;
+ static final int DEFAULT_REPLICA_ID = 0;
/**
* Returns the HRegionInfo for the given replicaId. HRegionInfo's correspond to
@@ -62,4 +62,7 @@ public class RegionReplicaUtil {
return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID);
}
+ public static boolean isDefaultReplica(int replicaId) {
+ return DEFAULT_REPLICA_ID == replicaId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index edffd18..575827f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -43,8 +45,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
@@ -53,6 +58,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
@Category(MediumTests.class)
public class TestAsyncProcess {
@@ -64,8 +70,9 @@ public class TestAsyncProcess {
private static final byte[] FAILS = "FAILS".getBytes();
private static final Configuration conf = new Configuration();
- private static ServerName sn = ServerName.valueOf("localhost:10,1254");
- private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
+ private static ServerName sn = ServerName.valueOf("s1:1,1");
+ private static ServerName sn2 = ServerName.valueOf("s2:2,2");
+ private static ServerName sn3 = ServerName.valueOf("s3:3,3");
private static HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
private static HRegionInfo hri2 =
@@ -76,6 +83,16 @@ public class TestAsyncProcess {
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
+ // Replica stuff
+ private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+ hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
+ private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+ private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+ new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
+ private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+ new HRegionLocation(hri2r1, sn3));
+ private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+
private static final String success = "success";
private static Exception failure = new Exception("failure");
@@ -139,6 +156,7 @@ public class TestAsyncProcess {
ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
+ @Override
public void execute(Runnable command) {
throw new RejectedExecutionException("test under failure");
}
@@ -158,7 +176,17 @@ public class TestAsyncProcess {
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
callsCt.incrementAndGet();
final MultiResponse mr = createMultiResponse(
- callable.getMulti(), nbMultiResponse, nbActions);
+ callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+ @Override
+ public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
+ if (Arrays.equals(FAILS, a.getAction().getRow())) {
+ mr.add(regionName, a.getOriginalIndex(), failure);
+ } else {
+ mr.add(regionName, a.getOriginalIndex(), success);
+ }
+ }
+ });
+
return new RpcRetryingCaller<MultiResponse>(100, 10) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
@@ -204,23 +232,106 @@ public class TestAsyncProcess {
}
}
- static MultiResponse createMultiResponse(
- final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
+ class MyAsyncProcessWithReplicas extends MyAsyncProcess {
+ private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
+ private long primarySleepMs = 0, replicaSleepMs = 0;
+ private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
+ private final AtomicLong replicaCalls = new AtomicLong(0);
+
+ public void addFailures(HRegionInfo... hris) {
+ for (HRegionInfo hri : hris) {
+ failures.add(hri.getRegionName());
+ }
+ }
+
+ public long getReplicaCallCount() {
+ return replicaCalls.get();
+ }
+
+ public void setPrimaryCallDelay(ServerName server, long primaryMs) {
+ customPrimarySleepMs.put(server, primaryMs);
+ }
+
+ public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
+ super(hc, conf);
+ }
+
+ public void setCallDelays(long primaryMs, long replicaMs) {
+ this.primarySleepMs = primaryMs;
+ this.replicaSleepMs = replicaMs;
+ }
+
+ @Override
+ protected RpcRetryingCaller<MultiResponse> createCaller(
+ MultiServerCallable<Row> callable) {
+ final MultiResponse mr = createMultiResponse(
+ callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+ @Override
+ public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
+ if (failures.contains(regionName)) {
+ mr.add(regionName, a.getOriginalIndex(), failure);
+ } else {
+ boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
+ mr.add(regionName, a.getOriginalIndex(),
+ Result.create(new Cell[0], null, isStale));
+ }
+ }
+ });
+ // Currently AsyncProcess either sends all-replica, or all-primary request.
+ final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
+ callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
+ final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
+ String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
+ + callable.getMulti().actions.size() + " entries: ";
+ for (byte[] region : callable.getMulti().actions.keySet()) {
+ debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
+ }
+ LOG.debug(debugMsg);
+ if (!isDefault) {
+ replicaCalls.incrementAndGet();
+ }
+
+ return new RpcRetryingCaller<MultiResponse>(100, 10) {
+ @Override
+ public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+ throws IOException, RuntimeException {
+ long sleep = -1;
+ if (isDefault) {
+ Long customSleep = customPrimarySleepMs.get(server);
+ sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
+ } else {
+ sleep = replicaSleepMs;
+ }
+ if (sleep != 0) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ }
+ }
+ return mr;
+ }
+ };
+ }
+ }
+
+ static MultiResponse createMultiResponse(final MultiAction<Row> multi,
+ AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
byte[] regionName = entry.getKey();
for (Action<Row> a : entry.getValue()) {
nbActions.incrementAndGet();
- if (Arrays.equals(FAILS, a.getAction().getRow())) {
- mr.add(regionName, a.getOriginalIndex(), failure);
- } else {
- mr.add(regionName, a.getOriginalIndex(), success);
- }
+ gen.addResponse(mr, regionName, a);
}
}
return mr;
}
+
+ private static interface ResponseGenerator {
+ void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
+ }
+
/**
* Returns our async process.
*/
@@ -233,9 +344,8 @@ public class TestAsyncProcess {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte[] row) {
- return loc1;
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+ return new RegionLocations(loc1);
}
}
@@ -253,18 +363,18 @@ public class TestAsyncProcess {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte[] row) {
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
int i = 0;
- for (HRegionLocation hr:hrl){
- if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
- usedRegions[i] = true;
- return hr;
+ for (HRegionLocation hr : hrl){
+ if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
+ usedRegions[i] = true;
+ return new RegionLocations(hr);
}
i++;
}
return null;
}
+
}
@Test
@@ -284,6 +394,7 @@ public class TestAsyncProcess {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
+ @Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
@@ -458,6 +569,7 @@ public class TestAsyncProcess {
final Thread myThread = Thread.currentThread();
Thread t = new Thread() {
+ @Override
public void run() {
Threads.sleep(2000);
myThread.interrupt();
@@ -478,6 +590,7 @@ public class TestAsyncProcess {
final long sleepTime = 2000;
Thread t2 = new Thread() {
+ @Override
public void run() {
Threads.sleep(sleepTime);
while (ap.tasksInProgress.get() > 0) {
@@ -496,32 +609,33 @@ public class TestAsyncProcess {
}
private static ClusterConnection createHConnection() throws IOException {
- ClusterConnection hc = Mockito.mock(ClusterConnection.class);
-
- Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
- Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1);
-
- Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2);
- Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
+ ClusterConnection hc = createHConnectionCommon();
+ setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
+ setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
+ setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
+ setMockLocation(hc, FAILS, new RegionLocations(loc2));
+ return hc;
+ }
- Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
- Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
+ private static ClusterConnection createHConnectionWithReplicas() throws IOException {
+ ClusterConnection hc = createHConnectionCommon();
+ setMockLocation(hc, DUMMY_BYTES_1, hrls1);
+ setMockLocation(hc, DUMMY_BYTES_2, hrls2);
+ setMockLocation(hc, DUMMY_BYTES_3, hrls3);
+ return hc;
+ }
- Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
- Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
- Mockito.eq(FAILS))).thenReturn(loc2);
+ private static void setMockLocation(ClusterConnection hc, byte[] row,
+ RegionLocations result) throws IOException {
+ Mockito.when(hc.locateRegionAll(
+ Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result);
+ }
+ private static ClusterConnection createHConnectionCommon() {
+ ClusterConnection hc = Mockito.mock(ClusterConnection.class);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
-
return hc;
}
@@ -756,7 +870,124 @@ public class TestAsyncProcess {
Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
}
- private void verifyResult(AsyncRequestFuture ars, boolean... expected) {
+ @Test
+ public void testReplicaReplicaSuccess() throws Exception {
+ // Main call takes too long so replicas succeed, except for one region w/o replicas.
+ // One region has no replica, so the main call succeeds for it.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+ verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
+ Assert.assertEquals(2, ap.getReplicaCallCount());
+ }
+
+ @Test
+ public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
+ // Main call succeeds before replica calls are kicked off.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
+ verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
+ Assert.assertEquals(0, ap.getReplicaCallCount());
+ }
+
+ @Test
+ public void testReplicaParallelCallsSucceed() throws Exception {
+ // Either main or replica can succeed.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+ verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
+ long replicaCalls = ap.getReplicaCallCount();
+ Assert.assertTrue(replicaCalls >= 0);
+ Assert.assertTrue(replicaCalls <= 2);
+ }
+
+ @Test
+ public void testReplicaPartialReplicaCall() throws Exception {
+ // One server is slow, so the result for its region comes from replica, whereas
+ // the result for other region comes from primary before replica calls happen.
+ // There should be no replica call for that region at all.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
+ ap.setPrimaryCallDelay(sn2, 2000);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+ verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
+ Assert.assertEquals(1, ap.getReplicaCallCount());
+ }
+
+ @Test
+ public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
+ // Main calls fail before replica calls can start - this is currently not handled.
+ // It would probably never happen if we can get location (due to retries),
+ // and it would require additional synchronization.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
+ ap.addFailures(hri1, hri2);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+ verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
+ Assert.assertEquals(0, ap.getReplicaCallCount());
+ }
+
+ @Test
+ public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
+ // Main calls fails after replica calls start. For two-replica region, one replica call
+ // also fails. Regardless, we get replica results for both regions.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
+ ap.addFailures(hri1, hri1r2, hri2);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+ verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
+ Assert.assertEquals(2, ap.getReplicaCallCount());
+ }
+
+ @Test
+ public void testReplicaAllCallsFailForOneRegion() throws Exception {
+ // For one of the region, all 3, main and replica, calls fail. For the other, replica
+ // call fails but its exception should not be visible as it did succeed.
+ MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
+ ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
+ List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
+ AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
+ verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
+ // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
+ Assert.assertEquals(3, ars.getErrors().getNumExceptions());
+ for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
+ Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
+ }
+ }
+
+ private MyAsyncProcessWithReplicas createReplicaAp(
+ int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
+ return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
+ }
+
+ private MyAsyncProcessWithReplicas createReplicaAp(
+ int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
+ // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
+ // that the replica call has happened and that way control the ordering.
+ Configuration conf = new Configuration();
+ ClusterConnection conn = createHConnectionWithReplicas();
+ conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs);
+ if (retries > 0) {
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
+ }
+ MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
+ ap.setCallDelays(primaryMs, replicaMs);
+ return ap;
+ }
+
+ private static List<Get> makeTimelineGets(byte[]... rows) {
+ List<Get> result = new ArrayList<Get>();
+ for (byte[] row : rows) {
+ Get get = new Get(row);
+ get.setConsistency(Consistency.TIMELINE);
+ result.add(get);
+ }
+ return result;
+ }
+
+ private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
Object[] actual = ars.getResults();
Assert.assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; ++i) {
@@ -764,6 +995,27 @@ public class TestAsyncProcess {
}
}
+ /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
+ private enum RR {
+ TRUE,
+ FALSE,
+ DONT_CARE,
+ FAILED
+ }
+
+ private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
+ Object[] actuals = ars.getResults();
+ Assert.assertEquals(expecteds.length, actuals.length);
+ for (int i = 0; i < expecteds.length; ++i) {
+ Object actual = actuals[i];
+ RR expected = expecteds[i];
+ Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable);
+ if (expected != RR.FAILED && expected != RR.DONT_CARE) {
+ Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
+ }
+ }
+ }
+
/**
* @param regCnt the region: 1 to 3.
* @param success if true, the put will succeed.
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
index cc0901f..e2be188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
@@ -28,12 +28,10 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class CoprocessorHConnection implements ClusterConnection {
+class CoprocessorHConnection implements ClusterConnection {
private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator();
/**
@@ -60,7 +58,7 @@ public class CoprocessorHConnection implements ClusterConnection {
* @return an unmanaged {@link HConnection}.
* @throws IOException if we cannot create the basic connection
*/
- public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
+ static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
throws IOException {
ClusterConnection connection =
ConnectionManager.createConnectionInternal(env.getConfiguration());
@@ -427,4 +425,9 @@ public class CoprocessorHConnection implements ClusterConnection {
public AsyncProcess getAsyncProcess() {
return delegate.getAsyncProcess();
}
+
+ @Override
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+ return delegate.locateRegionAll(tableName, row);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/25b6103d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 5a86ab5..f925fea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -110,6 +111,8 @@ public class HConnectionTestingUtility {
thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc);
+ Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())).
+ thenReturn(new RegionLocations(loc));
if (admin != null) {
// If a call to getAdmin, return this implementation.
Mockito.when(c.getAdmin(Mockito.any(ServerName.class))).