You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:17 UTC
[31/49] git commit: HBASE-10634 Multiget doesn't fully work
HBASE-10634 Multiget doesn't fully work
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586184 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61bce903
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61bce903
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61bce903
Branch: refs/heads/master
Commit: 61bce903626cfe775e7b2df32b46568bbc6fe6bd
Parents: d313103
Author: sershe <se...@unknown>
Authored: Thu Apr 10 00:50:45 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 160 ++++++++++++-------
1 file changed, 102 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/61bce903/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 252e808..3c6f5be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -136,12 +136,14 @@ class AsyncProcess {
/** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
int callCount;
- /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
- * is not last, adds error to list. If all calls fail the last one sets this to list. */
- Object result = null;
/** Errors for which it is not decided whether we will report them to user. If one of the
* calls succeeds, we will discard the errors that may have happened in the other calls. */
BatchErrors replicaErrors = null;
+
+ @Override
+ public String toString() {
+ return "[call count " + callCount + "; errors " + replicaErrors + "]";
+ }
}
@@ -622,6 +624,12 @@ class AsyncProcess {
replicaCount += (locs[i] != null) ? 1 : 0;
}
if (replicaCount == 0) {
+ // we could have got the replica back (if the server went down and the replica moved)
+ try {
+ loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true);
+ } catch (IOException e) {
+ manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
+ }
LOG.warn("No replicas found for " + action.getAction());
return;
}
@@ -813,12 +821,23 @@ class AsyncProcess {
isReplica = isReplicaAction;
HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
if (loc == null || loc.getServerName() == null) {
- // On retry, we couldn't find location for some replica we saw before.
- String str = "Cannot find location for replica " + action.getReplicaId();
- LOG.error(str);
- manageError(action.getOriginalIndex(), action.getAction(),
- false, new IOException(str), null);
- continue;
+ try {
+ locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId());
+ loc = locs.getRegionLocation(action.getReplicaId());
+ } catch (IOException e) {
+ // There are multiple retries in locateRegion already. No need to add new.
+ // We can't continue with this row, hence it's the last retry.
+ manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
+ continue;
+ }
+ if (loc == null || loc.getServerName() == null) {
+ // On retry, we couldn't find location for some replica we saw before.
+ String str = "Cannot find location for replica " + action.getReplicaId();
+ LOG.error(str);
+ manageError(action.getOriginalIndex(), action.getAction(),
+ false, new IOException(str), null);
+ continue;
+ }
}
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
@@ -1136,20 +1155,37 @@ class AsyncProcess {
* @param result The result.
*/
private void setResult(Action<Row> action, Object result) {
+ if (result == null) {
+ throw new RuntimeException("Result cannot be null");
+ }
ReplicaResultState state = null;
boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
- if (results == null || ((state = trySetResultSimple(
- action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
- decActionCounter();
+ int index = action.getOriginalIndex();
+ if (results == null) {
+ decActionCounter(index);
+ return; // Simple case, no replica requests.
+ } else if ((state = trySetResultSimple(
+ index, action.getAction(), result, isStale)) == null) {
return; // Simple case, no replica requests.
}
+ assert state != null;
+ // At this point we know that state is set to replica tracking class.
+ // It could be that someone else is also looking at it; however, we know there can
+ // only be one state object, and only one thread can set callCount to 0. Other threads
+ // will either see state with callCount 0 after locking it; or will not see state at all
+ // we will replace it with the result.
synchronized (state) {
if (state.callCount == 0) return; // someone already set the result
- state.result = result;
state.callCount = 0;
- state.replicaErrors = null; // no longer matters
}
- decActionCounter();
+ synchronized (replicaResultLock) {
+ if (results[index] != state) {
+ throw new AssertionError("We set the callCount but someone else replaced the result");
+ }
+ results[index] = result;
+ }
+
+ decActionCounter(index);
}
/**
@@ -1161,19 +1197,24 @@ class AsyncProcess {
*/
private void setError(int index, Row row, Throwable throwable, ServerName server) {
ReplicaResultState state = null;
- if (results == null
- || ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
+ if (results == null) {
+ // Note that we currently cannot have replica requests with null results. So it shouldn't
+ // happen that multiple replica calls will call dAC for same actions with results == null.
+ // Only one call per action should be present in this case.
+ errors.add(throwable, row, server);
+ decActionCounter(index);
+ return; // Simple case, no replica requests.
+ } else if ((state = trySetResultSimple(index, row, throwable, false)) == null) {
errors.add(throwable, row, server);
- decActionCounter();
return; // Simple case, no replica requests.
}
+ assert state != null;
BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
boolean isActionDone = false;
synchronized (state) {
switch (state.callCount) {
case 0: return; // someone already set the result
case 1: { // All calls failed, we are the last error.
- state.result = throwable;
target = errors;
isActionDone = true;
break;
@@ -1190,12 +1231,19 @@ class AsyncProcess {
--state.callCount;
}
target.add(throwable, row, server);
- if (!isActionDone) return;
- if (state.replicaErrors != null) { // last call, no need to lock
- errors.merge(state.replicaErrors);
- state.replicaErrors = null;
+ if (isActionDone) {
+ if (state.replicaErrors != null) { // last call, no need to lock
+ errors.merge(state.replicaErrors);
+ }
+ // See setResult for explanations.
+ synchronized (replicaResultLock) {
+ if (results[index] != state) {
+ throw new AssertionError("We set the callCount but someone else replaced the result");
+ }
+ results[index] = throwable;
+ }
+ decActionCounter(index);
}
- decActionCounter();
}
/**
@@ -1234,47 +1282,43 @@ class AsyncProcess {
}
}
}
- return (resObj == null || !(resObj instanceof ReplicaResultState))
- ? null : (ReplicaResultState)resObj;
+ if (resObj == null) {
+ decActionCounter(index);
+ return null;
+ }
+ return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
}
- private void decActionCounter() {
- if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
- // Convert replica sync structures to results.
- int staleCount = 0;
- if (replicaGetIndices == null) {
- for (int i = 0; i < results.length; ++i) {
- staleCount += convertReplicaResult(i) ? 1 : 0;
- }
- } else {
- for (int i = 0; i < replicaGetIndices.length; ++i) {
- staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
- }
- }
- if (!actionsInProgress.compareAndSet(1, 0)) {
- throw new AssertionError("Cannot set actions in progress to 0");
- }
- if (staleCount > 0) {
- LOG.trace("Returning " + staleCount + " stale results");
+ private void decActionCounter(int index) {
+ long actionsRemaining = actionsInProgress.decrementAndGet();
+ if (actionsRemaining < 0) {
+ String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
+ throw new AssertionError(error);
+ } else if (actionsRemaining == 0) {
+ synchronized (actionsInProgress) {
+ actionsInProgress.notifyAll();
}
- } else {
- actionsInProgress.decrementAndGet();
- }
- synchronized (actionsInProgress) {
- actionsInProgress.notifyAll();
}
}
- private boolean convertReplicaResult(int index) {
- if (!(results[index] instanceof ReplicaResultState)) return false;
- ReplicaResultState state = (ReplicaResultState)results[index];
- // We know that noone will touch state with 0 callCount, no need to lock
- if (state.callCount != 0) {
- throw new AssertionError("Actions are done but callcount is " + state.callCount);
+ private String buildDetailedErrorMsg(String string, int index) {
+ String error = string + "; called for " + index +
+ ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
+ if (replicaGetIndices != null) {
+ for (int i = 0; i < replicaGetIndices.length; ++i) {
+ error += replicaGetIndices[i] + ", ";
+ }
+ } else {
+ error += (hasAnyReplicaGets ? "all" : "none");
+ }
+ error += "; results ";
+ if (results != null) {
+ for (int i = 0; i < results.length; ++i) {
+ Object o = results[i];
+ error += ((o == null) ? "null" : o.toString()) + ", ";
+ }
}
- // TODO: we expect the Result coming from server to already have "isStale" specified.
- Object res = results[index] = state.result;
- return (res instanceof Result) && ((Result)res).isStale();
+ return error;
}
@Override