You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/10/31 11:36:53 UTC
svn commit: r1537431 - in
/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase:
client/AsyncProcess.java client/MultiResponse.java
client/MultiServerCallable.java protobuf/ResponseConverter.java
Author: nkeywal
Date: Thu Oct 31 10:36:53 2013
New Revision: 1537431
URL: http://svn.apache.org/r1537431
Log:
HBASE-9862 manage error per server and per region in the protobuffed client
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Thu Oct 31 10:36:53 2013
@@ -512,14 +512,23 @@ class AsyncProcess<CResult> {
try {
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
- LOG.warn("#" + id + ", call to " + loc.getServerName() +
- " failed numAttempt=" + numAttempt +
- ", resubmitting all since not sure where we are at", e);
- resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
+ // The service itself failed . It may be an error coming from the communication
+ // layer, but, as well, a functional error raised by the server.
+ receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
+ errorsByServer);
+ return;
+ } catch (Throwable t) {
+ // This should not happen. Let's log & retry anyway.
+ LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
+ " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
+ receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
+ errorsByServer);
return;
}
+ // Nominal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
+
} finally {
decTaskCounters(multiAction.getRegions(), loc.getServerName());
}
@@ -536,7 +545,7 @@ class AsyncProcess<CResult> {
" Server is " + loc.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
- resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
+ receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
}
}
}
@@ -603,14 +612,15 @@ class AsyncProcess<CResult> {
* @param numAttempt the number of attempts so far
* @param t the throwable (if any) that caused the resubmit
*/
- private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
- HRegionLocation location, int numAttempt, Throwable t,
- HConnectionManager.ServerErrorTracker errorsByServer) {
+ private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
+ HRegionLocation location, int numAttempt, Throwable t,
+ HConnectionManager.ServerErrorTracker errorsByServer) {
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
hConnection.updateCachedLocations(tableName,
rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
errorsByServer.reportServerError(location);
+
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
for (Action<Row> action : e.getValue()) {
@@ -620,34 +630,77 @@ class AsyncProcess<CResult> {
}
}
+ logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
+ t, errorsByServer);
+ }
+
+ /**
+ * Log as many info as possible, and, if there is something to replay, submit it again after
+ * a back off sleep.
+ */
+ private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
+ List<Action<Row>> toReplay, int numAttempt, int failureCount,
+ Throwable throwable,
+ HConnectionManager.ServerErrorTracker errorsByServer){
+
if (toReplay.isEmpty()) {
- LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all " +
- initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
- } else {
- submit(initialActions, toReplay, numAttempt, errorsByServer);
+ // it's either a success or a last failure
+ if (failureCount != 0) {
+ // We have a failure but nothing to retry. We're done, it's a final failure..
+ LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
+ oldLocation.getServerName(), throwable, -1, false,
+ errorsByServer.getStartTrackingTime()));
+ } else if (numAttempt > startLogErrorsCnt + 1) {
+ // The operation was successful, but needed several attempts. Let's log this.
+ LOG.info(createLog(numAttempt, failureCount, 0,
+ oldLocation.getServerName(), throwable, -1, false,
+ errorsByServer.getStartTrackingTime()));
+ }
+ return;
}
+
+ // We have something to replay. We're going to sleep a little before.
+
+ // We have two contradicting needs here:
+ // 1) We want to get the new location after having slept, as it may change.
+ // 2) We want to take into account the location when calculating the sleep time.
+ // It should be possible to have some heuristics to take the right decision. Short term,
+ // we go for one.
+ long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
+
+ if (numAttempt > startLogErrorsCnt) {
+ // We use this value to have some logs when we have multiple failures, but not too many
+ // logs, as errors are to be expected when a region moves, splits and so on
+ LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+ oldLocation.getServerName(), throwable, backOffTime, true,
+ errorsByServer.getStartTrackingTime()));
+ }
+
+ try {
+ Thread.sleep(backOffTime);
+ } catch (InterruptedException e) {
+ LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
+ Thread.interrupted();
+ return;
+ }
+
+ submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
}
/**
* Called when we receive the result of a server query.
*
* @param initialActions - the whole action list
- * @param rsActions - the actions for this location
+ * @param multiAction - the multiAction we sent
* @param location - the location. It's used as a server name.
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
- private void receiveMultiAction(List<Action<Row>> initialActions,
- MultiAction<Row> rsActions, HRegionLocation location,
+ private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
+ HRegionLocation location,
MultiResponse responses, int numAttempt,
HConnectionManager.ServerErrorTracker errorsByServer) {
-
- if (responses == null) {
- LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
- " failed all ops, trying resubmit," + location);
- resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
- return;
- }
+ assert responses != null;
// Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
@@ -657,9 +710,9 @@ class AsyncProcess<CResult> {
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
Throwable throwable = null;
-
int failureCount = 0;
boolean canRetry = true;
+
for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
responses.getResults().entrySet()) {
@@ -699,43 +752,36 @@ class AsyncProcess<CResult> {
}
}
- if (!toReplay.isEmpty()) {
- // We have two contradicting needs here:
- // 1) We want to get the new location after having slept, as it may change.
- // 2) We want to take into account the location when calculating the sleep time.
- // It should be possible to have some heuristics to take the right decision. Short term,
- // we go for one.
- long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
- if (numAttempt > startLogErrorsCnt) {
- // We use this value to have some logs when we have multiple failures, but not too many
- // logs, as errors are to be expected when a region moves, splits and so on
- LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
- location.getServerName(), throwable, backOffTime, true,
- errorsByServer.getStartTrackingTime()));
- }
-
- try {
- Thread.sleep(backOffTime);
- } catch (InterruptedException e) {
- LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location, e);
- Thread.interrupted();
- return;
- }
+ // The failures global to a region. We will use for multiAction we sent previously to find the
+ // actions to replay.
- submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
- } else {
- if (failureCount != 0) {
- // We have a failure but nothing to retry. We're done, it's a final failure..
- LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
- location.getServerName(), throwable, -1, false,
- errorsByServer.getStartTrackingTime()));
- } else if (numAttempt > startLogErrorsCnt + 1) {
- // The operation was successful, but needed several attempts. Let's log this.
- LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
- location.getServerName(), throwable, -1, false,
- errorsByServer.getStartTrackingTime()));
+ for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
+ throwable = throwableEntry.getValue();
+ byte[] region =throwableEntry.getKey();
+ List<Action<Row>> actions = multiAction.actions.get(region);
+ if (actions == null || actions.isEmpty()) {
+ throw new IllegalStateException("Wrong response for the region: " +
+ HRegionInfo.encodeRegionName(region));
+ }
+
+ if (failureCount == 0) {
+ errorsByServer.reportServerError(location);
+ canRetry = errorsByServer.canRetryMore(numAttempt);
+ }
+ hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
+ throwable, location);
+ failureCount += actions.size();
+
+ for (Action<Row> action : actions) {
+ Row row = action.getAction();
+ if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
+ toReplay.add(action);
+ }
}
}
+
+ logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
+ throwable, errorsByServer);
}
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Thu Oct 31 10:36:53 2013
@@ -40,6 +40,13 @@ public class MultiResponse {
private Map<byte[], List<Pair<Integer, Object>>> results =
new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
+ /**
+ * The server can send us a failure for the region itself, instead of individual failure.
+ * It's a part of the protobuf definition.
+ */
+ private Map<byte[], Throwable> exceptions =
+ new TreeMap<byte[], Throwable>(Bytes.BYTES_COMPARATOR);
+
public MultiResponse() {
super();
}
@@ -80,4 +87,19 @@ public class MultiResponse {
public Map<byte[], List<Pair<Integer, Object>>> getResults() {
return results;
}
+
+ public void addException(byte []regionName, Throwable ie){
+ exceptions.put(regionName, ie);
+ }
+
+ /**
+ * @return the exception for the region, if any. Null otherwise.
+ */
+ public Throwable getException(byte []regionName){
+ return exceptions.get(regionName);
+ }
+
+ public Map<byte[], Throwable> getExceptions() {
+ return exceptions;
+ }
}
\ No newline at end of file
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Thu Oct 31 10:36:53 2013
@@ -91,28 +91,12 @@ class MultiServerCallable<R> extends Reg
try {
responseProto = getStub().multi(controller, requestProto);
} catch (ServiceException e) {
- return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
+ throw ProtobufUtil.getRemoteException(e);
}
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}
- /**
- * @param request
- * @param t
- * @return Return a response that has every action in request failed w/ the passed in
- * exception <code>t</code> -- this will get them all retried after some backoff.
- */
- private static MultiResponse createAllFailedResponse(final ClientProtos.MultiRequest request,
- final Throwable t) {
- MultiResponse massFailedResponse = new MultiResponse();
- for (RegionAction rAction: request.getRegionActionList()) {
- byte [] regionName = rAction.getRegion().getValue().toByteArray();
- for (ClientProtos.Action action: rAction.getActionList()) {
- massFailedResponse.add(regionName, new Pair<Integer, Object>(action.getIndex(), t));
- }
- }
- return massFailedResponse;
- }
+
/**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1537431&r1=1537430&r2=1537431&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Thu Oct 31 10:36:53 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
@@ -94,13 +95,17 @@ public final class ResponseConverter {
for (int i = 0; i < responseRegionActionResultCount; i++) {
RegionAction actions = request.getRegionAction(i);
RegionActionResult actionResult = response.getRegionActionResult(i);
- byte[] regionName = actions.getRegion().toByteArray();
+ HBaseProtos.RegionSpecifier rs = actions.getRegion();
+ if (rs.hasType() &&
+ (rs.getType() != HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)){
+ throw new IllegalArgumentException(
+ "We support only encoded types for protobuf multi response.");
+ }
+ byte[] regionName = rs.getValue().toByteArray();
if (actionResult.hasException()){
Throwable regionException = ProtobufUtil.toException(actionResult.getException());
- for (ClientProtos.Action a : actions.getActionList()){
- results.add(regionName, new Pair<Integer, Object>(a.getIndex(), regionException));
- }
+ results.addException(regionName, regionException);
continue;
}