You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/10/31 11:36:53 UTC

svn commit: r1537431 - in /hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase: client/AsyncProcess.java client/MultiResponse.java client/MultiServerCallable.java protobuf/ResponseConverter.java

Author: nkeywal
Date: Thu Oct 31 10:36:53 2013
New Revision: 1537431

URL: http://svn.apache.org/r1537431
Log:
HBASE-9862 manage error per server and per region in the protobuffed client

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Thu Oct 31 10:36:53 2013
@@ -512,14 +512,23 @@ class AsyncProcess<CResult> {
             try {
               res = createCaller(callable).callWithoutRetries(callable);
             } catch (IOException e) {
-              LOG.warn("#" + id + ", call to " + loc.getServerName() +
-                  " failed numAttempt=" + numAttempt +
-                ", resubmitting all since not sure where we are at", e);
-              resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
+              // The service itself failed . It may be an error coming from the communication
+              //   layer, but, as well, a functional error raised by the server.
+              receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
+                  errorsByServer);
+              return;
+            } catch (Throwable t) {
+              // This should not happen. Let's log & retry anyway.
+              LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
+                  " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
+              receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
+                  errorsByServer);
               return;
             }
 
+            // Nominal case: we received an answer from the server, and it's not an exception.
             receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
+
           } finally {
             decTaskCounters(multiAction.getRegions(), loc.getServerName());
           }
@@ -536,7 +545,7 @@ class AsyncProcess<CResult> {
             " Server is " + loc.getServerName(), ree);
         // We're likely to fail again, but this will increment the attempt counter, so it will
         //  finish.
-        resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
+        receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
       }
     }
   }
@@ -603,14 +612,15 @@ class AsyncProcess<CResult> {
    * @param numAttempt the number of attempts so far
    * @param t the throwable (if any) that caused the resubmit
    */
-  private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
-                           HRegionLocation location, int numAttempt, Throwable t,
-                           HConnectionManager.ServerErrorTracker errorsByServer) {
+  private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
+                                    HRegionLocation location, int numAttempt, Throwable t,
+                                    HConnectionManager.ServerErrorTracker errorsByServer) {
     // Do not use the exception for updating cache because it might be coming from
     // any of the regions in the MultiAction.
     hConnection.updateCachedLocations(tableName,
       rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
     errorsByServer.reportServerError(location);
+
     List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
     for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
       for (Action<Row> action : e.getValue()) {
@@ -620,34 +630,77 @@ class AsyncProcess<CResult> {
       }
     }
 
+    logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
+        t, errorsByServer);
+  }
+
+  /**
+   * Log as many info as possible, and, if there is something to replay, submit it again after
+   *  a back off sleep.
+   */
+  private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
+                              List<Action<Row>> toReplay, int numAttempt, int failureCount,
+                              Throwable throwable,
+                              HConnectionManager.ServerErrorTracker errorsByServer){
+
     if (toReplay.isEmpty()) {
-      LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all " +
-        initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
-    } else {
-      submit(initialActions, toReplay, numAttempt, errorsByServer);
+      // it's either a success or a last failure
+      if (failureCount != 0) {
+        // We have a failure but nothing to retry. We're done, it's a final failure..
+        LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
+            oldLocation.getServerName(), throwable, -1, false,
+            errorsByServer.getStartTrackingTime()));
+      } else if (numAttempt > startLogErrorsCnt + 1) {
+        // The operation was successful, but needed several attempts. Let's log this.
+        LOG.info(createLog(numAttempt, failureCount, 0,
+            oldLocation.getServerName(), throwable, -1, false,
+            errorsByServer.getStartTrackingTime()));
+      }
+      return;
     }
+
+    // We have something to replay. We're going to sleep a little before.
+
+    // We have two contradicting needs here:
+    //  1) We want to get the new location after having slept, as it may change.
+    //  2) We want to take into account the location when calculating the sleep time.
+    // It should be possible to have some heuristics to take the right decision. Short term,
+    //  we go for one.
+    long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
+
+    if (numAttempt > startLogErrorsCnt) {
+      // We use this value to have some logs when we have multiple failures, but not too many
+      //  logs, as errors are to be expected when a region moves, splits and so on
+      LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+          oldLocation.getServerName(), throwable, backOffTime, true,
+          errorsByServer.getStartTrackingTime()));
+    }
+
+    try {
+      Thread.sleep(backOffTime);
+    } catch (InterruptedException e) {
+      LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
+      Thread.interrupted();
+      return;
+    }
+
+    submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
   }
 
   /**
    * Called when we receive the result of a server query.
    *
    * @param initialActions - the whole action list
-   * @param rsActions      - the actions for this location
+   * @param multiAction    - the multiAction we sent
    * @param location       - the location. It's used as a server name.
    * @param responses      - the response, if any
    * @param numAttempt     - the attempt
    */
-  private void receiveMultiAction(List<Action<Row>> initialActions,
-                                  MultiAction<Row> rsActions, HRegionLocation location,
+  private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
+                                  HRegionLocation location,
                                   MultiResponse responses, int numAttempt,
                                   HConnectionManager.ServerErrorTracker errorsByServer) {
-
-    if (responses == null) {
-      LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
-          " failed all ops, trying resubmit," + location);
-      resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
-      return;
-    }
+     assert responses != null;
 
     // Success or partial success
     // Analyze detailed results. We can still have individual failures to be redo.
@@ -657,9 +710,9 @@ class AsyncProcess<CResult> {
 
     List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
     Throwable throwable = null;
-
     int failureCount = 0;
     boolean canRetry = true;
+
     for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
         responses.getResults().entrySet()) {
 
@@ -699,43 +752,36 @@ class AsyncProcess<CResult> {
       }
     }
 
-    if (!toReplay.isEmpty()) {
-      // We have two contradicting needs here:
-      //  1) We want to get the new location after having slept, as it may change.
-      //  2) We want to take into account the location when calculating the sleep time.
-      // It should be possible to have some heuristics to take the right decision. Short term,
-      //  we go for one.
-      long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
-      if (numAttempt > startLogErrorsCnt) {
-        // We use this value to have some logs when we have multiple failures, but not too many
-        //  logs, as errors are to be expected when a region moves, splits and so on
-        LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
-            location.getServerName(), throwable, backOffTime, true,
-            errorsByServer.getStartTrackingTime()));
-      }
-
-      try {
-        Thread.sleep(backOffTime);
-      } catch (InterruptedException e) {
-        LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location, e);
-        Thread.interrupted();
-        return;
-      }
+    // The failures global to a region. We will use for multiAction we sent previously to find the
+    //   actions to replay.
 
-      submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
-    } else {
-      if (failureCount != 0) {
-        // We have a failure but nothing to retry. We're done, it's a final failure..
-        LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
-            location.getServerName(), throwable, -1, false,
-            errorsByServer.getStartTrackingTime()));
-      } else if (numAttempt > startLogErrorsCnt + 1) {
-        // The operation was successful, but needed several attempts. Let's log this.
-        LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
-            location.getServerName(), throwable, -1, false,
-            errorsByServer.getStartTrackingTime()));
+    for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
+      throwable = throwableEntry.getValue();
+      byte[] region =throwableEntry.getKey();
+      List<Action<Row>> actions = multiAction.actions.get(region);
+      if (actions == null || actions.isEmpty()) {
+        throw new IllegalStateException("Wrong response for the region: " +
+            HRegionInfo.encodeRegionName(region));
+      }
+
+      if (failureCount == 0) {
+        errorsByServer.reportServerError(location);
+        canRetry = errorsByServer.canRetryMore(numAttempt);
+      }
+      hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
+          throwable, location);
+      failureCount += actions.size();
+
+      for (Action<Row> action : actions) {
+        Row row = action.getAction();
+        if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
+          toReplay.add(action);
+        }
       }
     }
+
+    logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
+        throwable, errorsByServer);
   }
 
   private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Thu Oct 31 10:36:53 2013
@@ -40,6 +40,13 @@ public class MultiResponse {
   private Map<byte[], List<Pair<Integer, Object>>> results =
       new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
 
+  /**
+   * The server can send us a failure for the region itself, instead of individual failure.
+   * It's a part of the protobuf definition.
+   */
+  private Map<byte[], Throwable> exceptions =
+      new TreeMap<byte[], Throwable>(Bytes.BYTES_COMPARATOR);
+
   public MultiResponse() {
     super();
   }
@@ -80,4 +87,19 @@ public class MultiResponse {
   public Map<byte[], List<Pair<Integer, Object>>> getResults() {
     return results;
   }
+
+  public void addException(byte []regionName, Throwable ie){
+    exceptions.put(regionName, ie);
+  }
+
+  /**
+   * @return the exception for the region, if any. Null otherwise.
+   */
+  public Throwable getException(byte []regionName){
+    return exceptions.get(regionName);
+  }
+
+  public Map<byte[], Throwable> getExceptions() {
+    return exceptions;
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Thu Oct 31 10:36:53 2013
@@ -91,28 +91,12 @@ class MultiServerCallable<R> extends Reg
     try {
       responseProto = getStub().multi(controller, requestProto);
     } catch (ServiceException e) {
-      return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
+      throw ProtobufUtil.getRemoteException(e);
     }
     return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
   }
 
-  /**
-   * @param request
-   * @param t
-   * @return Return a response that has every action in request failed w/ the passed in
-   * exception <code>t</code> -- this will get them all retried after some backoff.
-   */
-  private static MultiResponse createAllFailedResponse(final ClientProtos.MultiRequest request,
-      final Throwable t) {
-    MultiResponse massFailedResponse = new MultiResponse();
-    for (RegionAction rAction: request.getRegionActionList()) {
-      byte [] regionName = rAction.getRegion().getValue().toByteArray();
-      for (ClientProtos.Action action: rAction.getActionList()) {
-        massFailedResponse.add(regionName, new Pair<Integer, Object>(action.getIndex(), t));
-      }
-    }
-    return massFailedResponse;
-  }
+
 
   /**
    * @return True if we should send data in cellblocks.  This is an expensive call.  Cache the

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Thu Oct 31 10:36:53 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
@@ -94,13 +95,17 @@ public final class ResponseConverter {
     for (int i = 0; i < responseRegionActionResultCount; i++) {
       RegionAction actions = request.getRegionAction(i);
       RegionActionResult actionResult = response.getRegionActionResult(i);
-      byte[] regionName = actions.getRegion().toByteArray();
+      HBaseProtos.RegionSpecifier rs = actions.getRegion();
+      if (rs.hasType() &&
+          (rs.getType() != HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)){
+        throw new IllegalArgumentException(
+            "We support only encoded types for protobuf multi response.");
+      }
+      byte[] regionName = rs.getValue().toByteArray();
 
       if (actionResult.hasException()){
         Throwable regionException =  ProtobufUtil.toException(actionResult.getException());
-        for (ClientProtos.Action a : actions.getActionList()){
-          results.add(regionName, new Pair<Integer, Object>(a.getIndex(), regionException));
-        }
+        results.addException(regionName, regionException);
         continue;
       }