You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:17 UTC

[31/49] git commit: HBASE-10634 Multiget doesn't fully work

HBASE-10634 Multiget doesn't fully work

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1586184 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61bce903
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61bce903
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61bce903

Branch: refs/heads/master
Commit: 61bce903626cfe775e7b2df32b46568bbc6fe6bd
Parents: d313103
Author: sershe <se...@unknown>
Authored: Thu Apr 10 00:50:45 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 160 ++++++++++++-------
 1 file changed, 102 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61bce903/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 252e808..3c6f5be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -136,12 +136,14 @@ class AsyncProcess {
 
     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
     int callCount;
-    /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
-     * is not last, adds error to list. If all calls fail the last one sets this to list. */
-    Object result = null;
     /** Errors for which it is not decided whether we will report them to user. If one of the
      * calls succeeds, we will discard the errors that may have happened in the other calls. */
     BatchErrors replicaErrors = null;
+
+    @Override
+    public String toString() {
+      return "[call count " + callCount + "; errors " + replicaErrors + "]";
+    }
   }
 
 
@@ -622,6 +624,12 @@ class AsyncProcess {
           replicaCount += (locs[i] != null) ? 1 : 0;
         }
         if (replicaCount == 0) {
+          // we could have got the replica back (if the server went down and the replica moved)
+          try {
+            loc = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true);
+          } catch (IOException e) {
+            manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
+          }
           LOG.warn("No replicas found for " + action.getAction());
           return;
         }
@@ -813,12 +821,23 @@ class AsyncProcess {
         isReplica = isReplicaAction;
         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
         if (loc == null || loc.getServerName() == null) {
-          // On retry, we couldn't find location for some replica we saw before.
-          String str = "Cannot find location for replica " + action.getReplicaId();
-          LOG.error(str);
-          manageError(action.getOriginalIndex(), action.getAction(),
-              false, new IOException(str), null);
-          continue;
+          try {
+            locs = hConnection.locateRegion(tableName, action.getAction().getRow(), false, true, action.getReplicaId());
+            loc = locs.getRegionLocation(action.getReplicaId());
+          } catch (IOException e) {
+            // There are multiple retries in locateRegion already. No need to add new.
+            // We can't continue with this row, hence it's the last retry.
+            manageError(action.getOriginalIndex(), action.getAction(), false, e, null);
+            continue;
+          }
+          if (loc == null || loc.getServerName() == null) {
+            // On retry, we couldn't find location for some replica we saw before.
+            String str = "Cannot find location for replica " + action.getReplicaId();
+            LOG.error(str);
+            manageError(action.getOriginalIndex(), action.getAction(),
+                false, new IOException(str), null);
+            continue;
+          }
         }
         byte[] regionName = loc.getRegionInfo().getRegionName();
         addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
@@ -1136,20 +1155,37 @@ class AsyncProcess {
      * @param result The result.
      */
     private void setResult(Action<Row> action, Object result) {
+      if (result == null) {
+        throw new RuntimeException("Result cannot be null");
+      }
       ReplicaResultState state = null;
       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
-      if (results == null || ((state = trySetResultSimple(
-          action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
-        decActionCounter();
+      int index = action.getOriginalIndex();
+      if (results == null) {
+         decActionCounter(index);
+         return; // Simple case, no replica requests.
+      } else if ((state = trySetResultSimple(
+          index, action.getAction(), result, isStale)) == null) {
         return; // Simple case, no replica requests.
       }
+      assert state != null;
+      // At this point we know that state is set to replica tracking class.
+      // It could be that someone else is also looking at it; however, we know there can
+      // only be one state object, and only one thread can set callCount to 0. Other threads
+      // will either see state with callCount 0 after locking it; or will not see state at all
+      // we will replace it with the result.
       synchronized (state) {
         if (state.callCount == 0) return; // someone already set the result
-        state.result = result;
         state.callCount = 0;
-        state.replicaErrors = null; // no longer matters
       }
-      decActionCounter();
+      synchronized (replicaResultLock) {
+        if (results[index] != state) {
+          throw new AssertionError("We set the callCount but someone else replaced the result");
+        }
+        results[index] = result;
+      }
+
+      decActionCounter(index);
     }
 
     /**
@@ -1161,19 +1197,24 @@ class AsyncProcess {
      */
     private void setError(int index, Row row, Throwable throwable, ServerName server) {
       ReplicaResultState state = null;
-      if (results == null
-          || ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
+      if (results == null) {
+        // Note that we currently cannot have replica requests with null results. So it shouldn't
+        // happen that multiple replica calls will call dAC for same actions with results == null.
+        // Only one call per action should be present in this case.
+        errors.add(throwable, row, server);
+        decActionCounter(index);
+        return; // Simple case, no replica requests.
+      } else if ((state = trySetResultSimple(index, row, throwable, false)) == null) {
         errors.add(throwable, row, server);
-        decActionCounter();
         return; // Simple case, no replica requests.
       }
+      assert state != null;
       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
       boolean isActionDone = false;
       synchronized (state) {
         switch (state.callCount) {
           case 0: return; // someone already set the result
           case 1: { // All calls failed, we are the last error.
-            state.result = throwable;
             target = errors;
             isActionDone = true;
             break;
@@ -1190,12 +1231,19 @@ class AsyncProcess {
         --state.callCount;
       }
       target.add(throwable, row, server);
-      if (!isActionDone) return;
-      if (state.replicaErrors != null) { // last call, no need to lock
-        errors.merge(state.replicaErrors);
-        state.replicaErrors = null;
+      if (isActionDone) {
+        if (state.replicaErrors != null) { // last call, no need to lock
+          errors.merge(state.replicaErrors);
+        }
+        // See setResult for explanations.
+        synchronized (replicaResultLock) {
+          if (results[index] != state) {
+            throw new AssertionError("We set the callCount but someone else replaced the result");
+          }
+          results[index] = throwable;
+        }
+        decActionCounter(index);
       }
-      decActionCounter();
     }
 
     /**
@@ -1234,47 +1282,43 @@ class AsyncProcess {
           }
         }
       }
-      return (resObj == null || !(resObj instanceof ReplicaResultState))
-          ? null : (ReplicaResultState)resObj;
+      if (resObj == null) {
+        decActionCounter(index);
+        return null;
+      }
+      return (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
     }
 
-    private void decActionCounter() {
-      if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
-        // Convert replica sync structures to results.
-        int staleCount = 0;
-        if (replicaGetIndices == null) {
-          for (int i = 0; i < results.length; ++i) {
-            staleCount += convertReplicaResult(i) ? 1 : 0;
-          }
-        } else {
-          for (int i = 0; i < replicaGetIndices.length; ++i) {
-            staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
-          }
-        }
-        if (!actionsInProgress.compareAndSet(1, 0)) {
-          throw new AssertionError("Cannot set actions in progress to 0");
-        }
-        if (staleCount > 0) {
-          LOG.trace("Returning " + staleCount + " stale results");
+    private void decActionCounter(int index) {
+      long actionsRemaining = actionsInProgress.decrementAndGet();
+      if (actionsRemaining < 0) {
+        String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
+        throw new AssertionError(error);
+      } else if (actionsRemaining == 0) {
+        synchronized (actionsInProgress) {
+          actionsInProgress.notifyAll();
         }
-      } else {
-        actionsInProgress.decrementAndGet();
-      }
-      synchronized (actionsInProgress) {
-        actionsInProgress.notifyAll();
       }
     }
 
-    private boolean convertReplicaResult(int index) {
-      if (!(results[index] instanceof ReplicaResultState)) return false;
-      ReplicaResultState state = (ReplicaResultState)results[index];
-      // We know that noone will touch state with 0 callCount, no need to lock
-      if (state.callCount != 0) {
-        throw new AssertionError("Actions are done but callcount is " + state.callCount);
+    private String buildDetailedErrorMsg(String string, int index) {
+      String error = string + "; called for " + index +
+          ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
+      if (replicaGetIndices != null) {
+        for (int i = 0; i < replicaGetIndices.length; ++i) {
+          error += replicaGetIndices[i] + ", ";
+        }
+      } else {
+        error += (hasAnyReplicaGets ? "all" : "none");
+      }
+      error += "; results ";
+      if (results != null) {
+        for (int i = 0; i < results.length; ++i) {
+          Object o = results[i];
+          error += ((o == null) ? "null" : o.toString()) + ", ";
+        }
       }
-      // TODO: we expect the Result coming from server to already have "isStale" specified.
-      Object res = results[index] = state.result;
-      return (res instanceof Result) && ((Result)res).isStale();
+      return error;
     }
 
     @Override