You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/06/24 20:57:29 UTC
svn commit: r1496159 [1/2] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/test/java/org/apache/hadoop/hbase/client/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
hbase-server/sr...
Author: nkeywal
Date: Mon Jun 24 18:57:29 2013
New Revision: 1496159
URL: http://svn.apache.org/r1496159
Log:
HBASE-6295 Possible performance improvement in client batch operations: presplit and send in background
Added:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Modified:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java Mon Jun 24 18:57:29 2013
@@ -32,7 +32,6 @@ public class Action<R> implements Compar
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
- private R result;
public Action(Row action, int originalIndex) {
super();
@@ -40,13 +39,6 @@ public class Action<R> implements Compar
this.originalIndex = originalIndex;
}
- public R getResult() {
- return result;
- }
-
- public void setResult(R result) {
- this.result = result;
- }
public Row getAction() {
return action;
Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1496159&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Mon Jun 24 18:57:29 2013
@@ -0,0 +1,728 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class allows a continuous flow of requests. It's written to be compatible with a
+ * synchronous caller such as HTable.
+ * <p>
+ * The caller sends a buffer of operation, by calling submit. This class extract from this list
+ * the operations it can send, i.e. the operations that are on region that are not considered
+ * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
+ * iterate on the list. If, and only if, the maximum number of current task is reached, the call
+ * to submit will block.
+ * </p>
+ * <p>
+ * The class manages internally the retries.
+ * </p>
+ * <p>
+ * The class includes an error marker: it allows to know if an operation has failed or not, and
+ * to get the exception details, i.e. the full list of throwables for each attempt. This marker
+ * is here to help the backward compatibility in HTable. In most (new) cases, it should be
+ * managed by the callbacks.
+ * </p>
+ * <p>
+ * A callback is available, in order to: <list>
+ * <li>Get the result of the operation (failure or success)</li>
+ * <li>When an operation fails but could be retried, allows or not to retry</li>
+ * <li>When an operation fails for good (can't be retried or already retried the maximum number
+ * time), register the error or not.
+ * </list>
+ * <p>
+ * This class is not thread safe externally; only one thread should submit operations at a time.
+ * Internally, the class is thread safe enough to manage simultaneously new submission and results
+ * arising from older operations.
+ * </p>
+ * <p>
+ * Internally, this class works with {@link Row}, this mean it could be theoretically used for
+ * gets as well.
+ * </p>
+ */
+class AsyncProcess<CResult> {
+ private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+
+ protected final HConnection hConnection;
+ protected final byte[] tableName;
+ protected final ExecutorService pool;
+ protected final AsyncProcessCallback<CResult> callback;
+ protected final BatchErrors errors = new BatchErrors();
+ protected final BatchErrors retriedErrors = new BatchErrors();
+ protected final AtomicBoolean hasError = new AtomicBoolean(false);
+ protected final AtomicLong tasksSent = new AtomicLong(0);
+ protected final AtomicLong tasksDone = new AtomicLong(0);
+ protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
+ new ConcurrentHashMap<String, AtomicInteger>();
+ protected final int maxTotalConcurrentTasks;
+ protected final int maxConcurrentTasksPerRegion;
+ protected final long pause;
+ protected int numTries;
+ protected final boolean useServerTrackerForRetries;
+ protected int serverTrackerTimeout;
+
+
+ /**
+ * This interface allows to keep the interface of the previous synchronous interface, that uses
+ * an array of object to return the result.
+ * <p/>
+ * This interface allows the caller to specify the behavior on errors: <list>
+ * <li>If we have not yet reach the maximum number of retries, the user can nevertheless
+ * specify if this specific operation should be retried or not.
+ * </li>
+ * <li>If an operation fails (i.e. is not retried or fails after all retries), the user can
+ * specify is we should mark this AsyncProcess as in error or not.
+ * </li>
+ * </list>
+ */
+ static interface AsyncProcessCallback<CResult> {
+
+ /**
+ * Called on success. originalIndex holds the index in the action list.
+ */
+ void success(int originalIndex, byte[] region, Row row, CResult result);
+
+ /**
+ * called on failure, if we don't retry (i.e. called once per failed operation).
+ *
+ * @return true if we should store the error and tag this async process as being in error.
+ * false if the failure of this operation can be safely ignored, and does not require
+ * the current process to be stopped without proceeding with the other operations in
+ * the queue.
+ */
+ boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
+
+ /**
+ * Called on a failure we plan to retry. This allows the user to stop retrying. Will be
+ * called multiple times for a single action if it fails multiple times.
+ *
+ * @return false if we should retry, true otherwise.
+ */
+ boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
+ }
+
+ private static class BatchErrors {
+ private List<Throwable> throwables = new ArrayList<Throwable>();
+ private List<Row> actions = new ArrayList<Row>();
+ private List<String> addresses = new ArrayList<String>();
+
+ public void add(Throwable ex, Row row, HRegionLocation location) {
+ throwables.add(ex);
+ actions.add(row);
+ addresses.add(location != null ? location.getHostnamePort() : "null location");
+ }
+
+ private RetriesExhaustedWithDetailsException makeException() {
+ return new RetriesExhaustedWithDetailsException(
+ new ArrayList<Throwable>(throwables),
+ new ArrayList<Row>(actions), new ArrayList<String>(addresses));
+ }
+
+ public void clear() {
+ throwables.clear();
+ actions.clear();
+ addresses.clear();
+ }
+ }
+
+ public AsyncProcess(HConnection hc, byte[] tableName, ExecutorService pool,
+ AsyncProcessCallback<CResult> callback, Configuration conf) {
+ this.hConnection = hc;
+ this.tableName = tableName;
+ this.pool = pool;
+ this.callback = callback;
+
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+
+ this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 200);
+
+ // With one, we ensure that the ordering of the queries is respected: we don't start
+ // a set of operations on a region before the previous one is done. As well, this limits
+ // the pressure we put on the region server.
+ this.maxConcurrentTasksPerRegion = conf.getInt("hbase.client.max.perregion.tasks", 1);
+
+ this.useServerTrackerForRetries =
+ conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
+
+ if (this.useServerTrackerForRetries) {
+ // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+ // However, if we are too useful, we might fail very quickly due to retry count limit.
+ // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+ // retry time if normal retries were used. Then we will retry until this time runs out.
+ // If we keep hitting one server, the net effect will be the incremental backoff, and
+ // essentially the same number of retries as planned. If we have to do faster retries,
+ // we will do more retries in aggregate, but the user will be none the wiser.
+ this.serverTrackerTimeout = 0;
+ for (int i = 0; i < this.numTries; ++i) {
+ serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
+ }
+ }
+ }
+
+ /**
+ * Extract from the rows list what we can submit. The rows we can not submit are kept in the
+ * list.
+ *
+ * @param rows - the submitted row. Modified by the method: we remove the rows we took.
+ * @param atLeastOne true if we should submit at least a subset.
+ */
+ public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
+ if (rows.isEmpty()){
+ return;
+ }
+
+ Map<HRegionLocation, MultiAction<Row>> actionsByServer =
+ new HashMap<HRegionLocation, MultiAction<Row>>();
+ List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
+
+ do {
+ Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+ long currentTaskNumber = waitForMaximumCurrentTasks(maxTotalConcurrentTasks);
+ int posInList = -1;
+ Iterator<? extends Row> it = rows.iterator();
+ while (it.hasNext()) {
+ Row r = it.next();
+ HRegionLocation loc = findDestLocation(r, 1, posInList, false, regionIncluded);
+
+ if (loc != null) { // loc is null if the dest is too busy or there is an error
+ Action<Row> action = new Action<Row>(r, ++posInList);
+ retainedActions.add(action);
+ addAction(loc, action, actionsByServer);
+ it.remove();
+ }
+ }
+
+ if (retainedActions.isEmpty() && atLeastOne && !hasError()) {
+ waitForNextTaskDone(currentTaskNumber);
+ }
+
+ } while (retainedActions.isEmpty() && atLeastOne && !hasError());
+
+ HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
+ sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
+ }
+
+ /**
+ * Group the actions per region server.
+ *
+ * @param loc - the destination. Must not be null.
+ * @param action - the action to add to the multiaction
+ * @param actionsByServer the multiaction per server
+ */
+ private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
+ MultiAction<Row>> actionsByServer) {
+ final byte[] regionName = loc.getRegionInfo().getRegionName();
+ MultiAction<Row> multiAction = actionsByServer.get(loc);
+ if (multiAction == null) {
+ multiAction = new MultiAction<Row>();
+ actionsByServer.put(loc, multiAction);
+ }
+
+ multiAction.add(regionName, action);
+ }
+
+ /**
+ * Find the destination, if this destination is not considered as busy.
+ *
+ * @param row the row
+ * @param numAttempt the num attempt
+ * @param posInList the position in the list
+ * @param force if we must submit whatever the server load
+ * @param regionStatus the
+ * @return null if we should not submit, the destination otherwise.
+ */
+ private HRegionLocation findDestLocation(Row row, int numAttempt,
+ int posInList, boolean force,
+ Map<String, Boolean> regionStatus) {
+ HRegionLocation loc = null;
+ IOException locationException = null;
+ try {
+ loc = hConnection.locateRegion(this.tableName, row.getRow());
+ if (loc == null) {
+ locationException = new IOException("No location found, aborting submit for" +
+ " tableName=" + Bytes.toString(tableName) +
+ " rowkey=" + Arrays.toString(row.getRow()));
+ }
+ } catch (IOException e) {
+ locationException = e;
+ }
+ if (locationException != null) {
+ // There are multiple retries in locateRegion already. No need to add new.
+ // We can't continue with this row, hence it's the last retry.
+ manageError(numAttempt, posInList, row, false, locationException, null);
+ return null;
+ }
+
+ if (force) {
+ return loc;
+ }
+
+ String regionName = loc.getRegionInfo().getEncodedName();
+ Boolean addIt = regionStatus.get(regionName);
+ if (addIt == null) {
+ addIt = canTakeNewOperations(regionName);
+ regionStatus.put(regionName, addIt);
+ }
+
+ return addIt ? loc : null;
+ }
+
+
+ /**
+ * Check if we should send new operations to this region.
+ *
+ * @param encodedRegionName region name
+ * @return true if this region is considered as busy.
+ */
+ protected boolean canTakeNewOperations(String encodedRegionName) {
+ AtomicInteger ct = taskCounterPerRegion.get(encodedRegionName);
+ return ct == null || ct.get() < maxConcurrentTasksPerRegion;
+ }
+
+ /**
+ * Submit immediately the list of rows, whatever the server status. Kept for backward
+ * compatibility: it allows to be used with the batch interface that return an array of objects.
+ *
+ * @param rows the list of rows.
+ */
+ public void submitAll(List<? extends Row> rows) {
+ List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
+
+ // The position will be used by the processBatch to match the object array returned.
+ int posInList = -1;
+ for (Row r : rows) {
+ posInList++;
+ Action<Row> action = new Action<Row>(r, posInList);
+ actions.add(action);
+ }
+ HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
+ submit(actions, actions, 1, true, errorsByServer);
+ }
+
+
+ /**
+ * Group a list of actions per region servers, and send them. The created MultiActions are
+ * added to the inProgress list.
+ *
+ * @param initialActions - the full list of the actions in progress
+ * @param currentActions - the list of row to submit
+ * @param numAttempt - the current numAttempt (first attempt is 1)
+ * @param force - true if we submit the rowList without taking into account the server load
+ */
+ private void submit(List<Action<Row>> initialActions,
+ List<Action<Row>> currentActions, int numAttempt, boolean force,
+ final HConnectionManager.ServerErrorTracker errorsByServer) {
+ // group per location => regions server
+ final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
+ new HashMap<HRegionLocation, MultiAction<Row>>();
+
+ // We have the same policy for a single region per call to submit: we don't want
+ // to send half of the actions because the status changed in the middle. So we keep the
+ // status
+ Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+
+ for (Action<Row> action : currentActions) {
+ HRegionLocation loc = findDestLocation(
+ action.getAction(), 1, action.getOriginalIndex(), force, regionIncluded);
+
+ if (loc != null) {
+ addAction(loc, action, actionsByServer);
+ }
+ }
+
+ if (!actionsByServer.isEmpty()) {
+ sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer);
+ }
+ }
+
+ /**
+ * Send a multi action structure to the servers, after a delay depending on the attempt
+ * number. Asynchronous.
+ *
+ * @param initialActions the list of the actions, flat.
+ * @param actionsByServer the actions structured by regions
+ * @param numAttempt the attempt number.
+ */
+ public void sendMultiAction(final List<Action<Row>> initialActions,
+ Map<HRegionLocation, MultiAction<Row>> actionsByServer,
+ final int numAttempt,
+ final HConnectionManager.ServerErrorTracker errorsByServer) {
+
+ // Send the queries and add them to the inProgress list
+ for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
+ final HRegionLocation loc = e.getKey();
+ final MultiAction<Row> multi = e.getValue();
+ final String regionName = loc.getRegionInfo().getEncodedName();
+
+ incTaskCounters(regionName);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ MultiResponse res;
+ try {
+ ServerCallable<MultiResponse> callable = createCallable(loc, multi);
+ try {
+ res = callable.withoutRetries();
+ } catch (IOException e) {
+ LOG.warn("The call to the RS failed, we don't know where we stand. regionName="
+ + regionName, e);
+ resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
+ return;
+ }
+
+ receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
+ } finally {
+ decTaskCounters(regionName);
+ }
+ }
+ };
+
+ try {
+ this.pool.submit(runnable);
+ } catch (RejectedExecutionException ree) {
+ // This should never happen. But as the pool is provided by the end user, let's secure
+ // this a little.
+ decTaskCounters(regionName);
+ LOG.warn("The task was rejected by the pool. This is unexpected. " +
+ "regionName=" + regionName, ree);
+ // We're likely to fail again, but this will increment the attempt counter, so it will
+ // finish.
+ resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
+ }
+ }
+ }
+
+ /**
+ * Create a callable. Isolated to be easily overridden in the tests.
+ */
+ protected ServerCallable<MultiResponse> createCallable(
+ final HRegionLocation loc, final MultiAction<Row> multi) {
+
+ return new MultiServerCallable<Row>(hConnection, tableName, loc, multi);
+ }
+
+ /**
+ * Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
+ *
+ * @param numAttempt the number of this attempt
+ * @param originalIndex the position in the list sent
+ * @param row the row
+ * @param canRetry if false, we won't retry whatever the settings.
+ * @param throwable the throwable, if any (can be null)
+ * @param location the location, if any (can be null)
+ * @return true if the action can be retried, false otherwise.
+ */
+ private boolean manageError(int numAttempt, int originalIndex, Row row, boolean canRetry,
+ Throwable throwable, HRegionLocation location) {
+ if (canRetry) {
+ if (numAttempt >= numTries ||
+ (throwable != null && throwable instanceof DoNotRetryIOException)) {
+ canRetry = false;
+ }
+ }
+ byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
+
+ if (canRetry && callback != null) {
+ canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
+ }
+
+ if (canRetry) {
+ if (LOG.isTraceEnabled()) {
+ retriedErrors.add(throwable, row, location);
+ }
+ } else {
+ if (callback != null) {
+ callback.failure(originalIndex, region, row, throwable);
+ }
+ this.hasError.set(true);
+ errors.add(throwable, row, location);
+ }
+
+ return canRetry;
+ }
+
+ /**
+ * Resubmit all the actions from this multiaction after a failure.
+ *
+ * @param initialActions the full initial action list
+ * @param rsActions the actions still to do from the initial list
+ * @param location the destination
+ * @param numAttempt the number of attempts so far
+ * @param t the throwable (if any) that caused the resubmit
+ */
+ private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
+ HRegionLocation location, int numAttempt, Throwable t,
+ HConnectionManager.ServerErrorTracker errorsByServer) {
+ // Do not use the exception for updating cache because it might be coming from
+ // any of the regions in the MultiAction.
+ hConnection.updateCachedLocations(tableName,
+ rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
+ errorsByServer.reportServerError(location);
+
+ List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
+ for (List<Action<Row>> actions : rsActions.actions.values()) {
+ for (Action<Row> action : actions) {
+ if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
+ true, t, location)) {
+ toReplay.add(action);
+ }
+ }
+ }
+
+ submit(initialActions, toReplay, numAttempt, true, errorsByServer);
+ }
+
+ /**
+ * Called when we receive the result of a server query.
+ *
+ * @param initialActions - the whole action list
+ * @param rsActions - the actions for this location
+ * @param location - the location
+ * @param responses - the response, if any
+ * @param numAttempt - the attempt
+ */
+ private void receiveMultiAction(List<Action<Row>> initialActions,
+ MultiAction<Row> rsActions, HRegionLocation location,
+ MultiResponse responses, int numAttempt,
+ HConnectionManager.ServerErrorTracker errorsByServer) {
+
+ if (responses == null) {
+ LOG.info("Attempt #" + numAttempt + " failed for all operations on server " +
+ location.getServerName() + " , trying to resubmit.");
+ resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
+ return;
+ }
+
+ // Success or partial success
+ // Analyze detailed results. We can still have individual failures to be redo.
+ // two specific throwables are managed:
+ // - DoNotRetryIOException: we continue to retry for other actions
+ // - RegionMovedException: we update the cache with the new region location
+
+ List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
+ Throwable throwable = null;
+
+ int failureCount = 0;
+ boolean canRetry = true;
+ for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
+ responses.getResults().entrySet()) {
+
+ for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
+ Object result = regionResult.getSecond();
+
+ // Failure: retry if it's make sense else update the errors lists
+ if (result == null || result instanceof Throwable) {
+ throwable = (Throwable) result;
+ Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+ Row row = correspondingAction.getAction();
+
+ if (failureCount++ == 0) { // We're doing this once per location.
+ hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
+ if (errorsByServer != null) {
+ errorsByServer.reportServerError(location);
+ canRetry = errorsByServer.canRetryMore();
+ }
+ }
+
+ if (manageError(numAttempt, correspondingAction.getOriginalIndex(), row, canRetry,
+ throwable, location)) {
+ toReplay.add(correspondingAction);
+ }
+ } else { // success
+ if (callback != null) {
+ Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+ Row row = correspondingAction.getAction();
+ //noinspection unchecked
+ this.callback.success(correspondingAction.getOriginalIndex(),
+ resultsForRS.getKey(), row, (CResult) result);
+ }
+ }
+ }
+ }
+
+ if (!toReplay.isEmpty()) {
+ if (numAttempt > 2) {
+ // We use this value to have some logs when we have multiple failures, but not too many
+ // logs as errors are to be expected wehn region moves, split and so on
+ LOG.debug("Attempt #" + numAttempt + " failed for " + failureCount +
+ " operations on server " + location.getServerName() + ", resubmitting " +
+ toReplay.size() + ", tableName=" + Bytes.toString(tableName) +
+ ", last exception was: " + throwable);
+ }
+ long backOffTime = (errorsByServer != null ?
+ errorsByServer.calculateBackoffTime(location, pause) :
+ ConnectionUtils.getPauseTime(pause, numAttempt));
+ try {
+ Thread.sleep(backOffTime);
+ } catch (InterruptedException e) {
+ LOG.warn("Not sent: " + toReplay.size() +
+ " operations, tableName=" + Bytes.toString(tableName), e);
+ Thread.interrupted();
+ return;
+ }
+
+ submit(initialActions, toReplay, numAttempt + 1, true, errorsByServer);
+ } else if (failureCount != 0) {
+ LOG.warn("Attempt #" + numAttempt + " failed for " + failureCount +
+ " operations on server " + location.getServerName() + " NOT resubmitting." +
+ ", tableName=" + Bytes.toString(tableName));
+ }
+ }
+
+ /**
+ * Waits for another task to finish.
+ * @param currentNumberOfTask - the number of task finished when calling the method.
+ */
+ protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
+ while (currentNumberOfTask == tasksDone.get()) {
+ try {
+ synchronized (this.tasksDone) {
+ this.tasksDone.wait(100);
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted." +
+ " currentNumberOfTask=" + currentNumberOfTask +
+ ", tableName=" + Bytes.toString(tableName) + ", tasksDone=" + tasksDone.get());
+ }
+ }
+ }
+
+ /**
+ * Wait until the async does not have more than max tasks in progress.
+ */
+ private long waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
+ long lastLog = 0;
+ long currentTasksDone = this.tasksDone.get();
+
+ while ((tasksSent.get() - currentTasksDone) > max) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (now > lastLog + 5000) {
+ lastLog = now;
+ LOG.info(Bytes.toString(tableName) +
+ ": Waiting for the global number of tasks to be equals or less than " + max +
+ ", currently it's " + this.tasksDone.get());
+ }
+ waitForNextTaskDone(currentTasksDone);
+ currentTasksDone = this.tasksDone.get();
+ }
+
+ return currentTasksDone;
+ }
+
+ /**
+ * Wait until all tasks are executed, successfully or not.
+ */
+ public void waitUntilDone() throws InterruptedIOException {
+ waitForMaximumCurrentTasks(0);
+ }
+
+
+ public boolean hasError() {
+ return hasError.get();
+ }
+
+ public List<? extends Row> getFailedOperations() {
+ return errors.actions;
+ }
+
+ /**
+ * Clean the errors stacks. Should be called only when there are no actions in progress.
+ */
+ public void clearErrors() {
+ errors.clear();
+ retriedErrors.clear();
+ hasError.set(false);
+ }
+
+ public RetriesExhaustedWithDetailsException getErrors() {
+ return errors.makeException();
+ }
+
+ /**
+ * incrementer the tasks counters for a given region. MT safe.
+ */
+ protected void incTaskCounters(String encodedRegionName) {
+ tasksSent.incrementAndGet();
+
+ AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+ if (counterPerServer == null) {
+ taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
+ counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+ }
+ counterPerServer.incrementAndGet();
+ }
+
+ /**
+ * Decrements the counters for a given region
+ */
+ protected void decTaskCounters(String encodedRegionName) {
+ AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+ counterPerServer.decrementAndGet();
+
+ tasksDone.incrementAndGet();
+ synchronized (tasksDone) {
+ tasksDone.notifyAll();
+ }
+ }
+
+ /**
+ * Creates the server error tracker to use inside process.
+ * Currently, to preserve the main assumption about current retries, and to work well with
+ * the retry-limit-based calculation, the calculation is local per Process object.
+ * We may benefit from connection-wide tracking of server errors.
+ * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
+ */
+ protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
+ if (useServerTrackerForRetries){
+ return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout);
+ }else {
+ return null;
+ }
+ }
+}
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Mon Jun 24 18:57:29 2013
@@ -172,6 +172,17 @@ public interface HConnection extends Abo
throws IOException;
/**
+ * Update the location cache. This is used internally by HBase, in most cases it should not be
+ * used by the client application.
+ * @param tableName the table name
+ * @param rowkey the row
+ * @param exception the exception if any. Can be null.
+ * @param source the previous location
+ */
+ public void updateCachedLocations(byte[] tableName, byte[] rowkey,
+ Object exception, HRegionLocation source);
+
+ /**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
* @return HRegionLocation that describes where to find the region in
@@ -354,7 +365,7 @@ public interface HConnection extends Abo
/**
- * Clear any caches that pertain to server name <code>sn</code>
+ * Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
public void clearCaches(final ServerName sn);
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Jun 24 18:57:29 2013
@@ -26,20 +26,16 @@ import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -141,16 +137,13 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
-import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
-import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -440,8 +433,6 @@ public class HConnectionManager {
private final int numTries;
final int rpcTimeout;
private final int prefetchRegionLimit;
- private final boolean useServerTrackerForRetries;
- private final long serverTrackerTimeout;
private volatile boolean closed;
private volatile boolean aborted;
@@ -490,12 +481,12 @@ public class HConnectionManager {
private int refCount;
// indicates whether this connection's life cycle is managed (by us)
- private final boolean managed;
+ private boolean managed;
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
- final Registry registry;
+ Registry registry;
/**
* constructor
@@ -509,34 +500,8 @@ public class HConnectionManager {
* users of an HConnectionImplementation instance.
*/
HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
- this.conf = conf;
+ this(conf);
this.managed = managed;
- this.closed = false;
- this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.rpcTimeout = conf.getInt(
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- this.prefetchRegionLimit = conf.getInt(
- HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
- HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
- this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true);
- long serverTrackerTimeout = 0;
- if (this.useServerTrackerForRetries) {
- // Server tracker allows us to do faster, and yet useful (hopefully), retries.
- // However, if we are too useful, we might fail very quickly due to retry count limit.
- // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
- // retry time if normal retries were used. Then we will retry until this time runs out.
- // If we keep hitting one server, the net effect will be the incremental backoff, and
- // essentially the same number of retries as planned. If we have to do faster retries,
- // we will do more retries in aggregate, but the user will be none the wiser.
- for (int i = 0; i < this.numTries; ++i) {
- serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
- }
- }
- this.serverTrackerTimeout = serverTrackerTimeout;
this.registry = setupRegistry();
retrieveClusterId();
@@ -560,6 +525,24 @@ public class HConnectionManager {
}, conf, listenerClass);
}
}
+
+ /**
+ * For tests.
+ */
+ protected HConnectionImplementation(Configuration conf) {
+ this.conf = conf;
+ this.closed = false;
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.rpcTimeout = conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.prefetchRegionLimit = conf.getInt(
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+ }
/**
* @return The cluster registry implementation to use.
@@ -1982,25 +1965,8 @@ public class HConnectionManager {
return callable.withoutRetries();
}
- @Deprecated
- private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
- final MultiAction<R> multi, final byte[] tableName) {
- // TODO: This does not belong in here!!! St.Ack HConnections should
- // not be dealing in Callables; Callables have HConnections, not other
- // way around.
- final HConnection connection = this;
- return new Callable<MultiResponse>() {
- @Override
- public MultiResponse call() throws Exception {
- ServerCallable<MultiResponse> callable =
- new MultiServerCallable<R>(connection, tableName, loc, multi);
- return callable.withoutRetries();
- }
- };
- }
-
- void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
- ServerName serverName, long seqNum) {
+ void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
+ ServerName serverName, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
synchronized (this.cachedRegionLocations) {
cacheLocation(hri.getTableName(), source, newHrl);
@@ -2058,16 +2024,17 @@ public class HConnectionManager {
* or wrapped or both RegionMovedException
* @param source server that is the source of the location update.
*/
- private void updateCachedLocations(final byte[] tableName, Row row,
+ @Override
+ public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
- if (row == null || tableName == null) {
- LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
+ if (rowkey == null || tableName == null) {
+ LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
return;
}
// Is it something we have already updated?
- final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
+ final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
if (oldLocation == null) {
// There is no such location in the cache => it's been removed already => nothing to do
return;
@@ -2125,365 +2092,62 @@ public class HConnectionManager {
Batch.Callback<R> callback)
throws IOException, InterruptedException {
- Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
- p.processBatchCallback();
+ // To fulfill the original contract, we have a special callback. This callback
+ // will set the results in the Object array.
+ ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
+ AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
+
+ // We're doing a submit all. This way, the originalIndex will match the initial list.
+ asyncProcess.submitAll(list);
+ asyncProcess.waitUntilDone();
+
+ if (asyncProcess.hasError()) {
+ throw asyncProcess.getErrors();
+ }
+ }
+
+ // For tests.
+ protected <R> AsyncProcess createAsyncProcess(byte[] tableName, ExecutorService pool,
+ AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
+ return new AsyncProcess<R>(this, tableName, pool, callback, conf);
}
/**
- * Methods and attributes to manage a batch process are grouped into this single class.
- * This allows, by creating a Process<R> per batch process to ensure multithread safety.
- *
- * This code should be move to HTable once processBatchCallback is not supported anymore in
- * the HConnection interface.
+ * Fill the result array for the interfaces using it.
*/
- private static class Process<R> {
- // Info on the queries and their context
- private final HConnectionImplementation hci;
- private final List<? extends Row> rows;
- private final byte[] tableName;
- private final ExecutorService pool;
+ private static class ObjectResultFiller<Res>
+ implements AsyncProcess.AsyncProcessCallback<Res> {
+
private final Object[] results;
- private final Batch.Callback<R> callback;
+ private Batch.Callback<Res> callback;
- // Used during the batch process
- private final List<Action<R>> toReplay;
- private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
- inProgress;
-
- private ServerErrorTracker errorsByServer = null;
- private int curNumRetries;
-
- // Notified when a tasks is done
- private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
-
- private Process(HConnectionImplementation hci, List<? extends Row> list,
- byte[] tableName, ExecutorService pool, Object[] results,
- Batch.Callback<R> callback){
- this.hci = hci;
- this.rows = list;
- this.tableName = tableName;
- this.pool = pool;
+ ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
this.results = results;
this.callback = callback;
- this.toReplay = new ArrayList<Action<R>>();
- this.inProgress =
- new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
- this.curNumRetries = 0;
- }
-
-
- /**
- * Group a list of actions per region servers, and send them. The created MultiActions are
- * added to the inProgress list.
- * @param actionsList
- * @param isRetry Whether we are retrying these actions. If yes, backoff
- * time may be applied before new requests.
- * @throws IOException - if we can't locate a region after multiple retries.
- */
- private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException {
- // group per location => regions server
- final Map<HRegionLocation, MultiAction<R>> actionsByServer =
- new HashMap<HRegionLocation, MultiAction<R>>();
- for (Action<R> aAction : actionsList) {
- final Row row = aAction.getAction();
-
- if (row != null) {
- final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow());
- if (loc == null) {
- throw new IOException("No location found, aborting submit.");
- }
-
- final byte[] regionName = loc.getRegionInfo().getRegionName();
- MultiAction<R> actions = actionsByServer.get(loc);
- if (actions == null) {
- actions = new MultiAction<R>();
- actionsByServer.put(loc, actions);
- }
- actions.add(regionName, aAction);
- }
- }
-
- // Send the queries and add them to the inProgress list
- for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
- long backoffTime = 0;
- if (isRetry) {
- if (hci.isUsingServerTrackerForRetries()) {
- assert this.errorsByServer != null;
- backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
- } else {
- // curNumRetries starts with one, subtract to start from 0.
- backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
- }
- }
- Callable<MultiResponse> callable =
- createDelayedCallable(backoffTime, e.getKey(), e.getValue());
- if (LOG.isTraceEnabled() && isRetry) {
- StringBuilder sb = new StringBuilder();
- for (Action<R> action : e.getValue().allActions()) {
- if (sb.length() > 0) sb.append(' ');
- sb.append(Bytes.toStringBinary(action.getAction().getRow()));
- }
- LOG.trace("Attempt #" + this.curNumRetries + " against " + e.getKey().getHostnamePort()
- + " after=" + backoffTime + "ms, row(s)=" + sb.toString());
- }
- Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
- new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
- e.getValue(), e.getKey(), this.pool.submit(callable));
- this.inProgress.addLast(p);
- }
- }
-
- /**
- * Resubmit the actions which have failed, after a sleep time.
- * @throws IOException
- */
- private void doRetry() throws IOException{
- submit(this.toReplay, true);
- this.toReplay.clear();
- }
-
- /**
- * Parameterized batch processing, allowing varying return types for
- * different {@link Row} implementations.
- * Throws an exception on error. If there are no exceptions, it means that the 'results'
- * array is clean.
- */
- private void processBatchCallback() throws IOException, InterruptedException {
- if (this.results.length != this.rows.size()) {
- throw new IllegalArgumentException(
- "argument results (size="+results.length+") must be the same size as " +
- "argument list (size="+this.rows.size()+")");
- }
- if (this.rows.isEmpty()) {
- return;
- }
-
- boolean isTraceEnabled = LOG.isTraceEnabled();
- BatchErrors errors = new BatchErrors();
- BatchErrors retriedErrors = null;
- if (isTraceEnabled) {
- retriedErrors = new BatchErrors();
- }
-
- // We keep the number of retry per action.
- int[] nbRetries = new int[this.results.length];
-
- // Build the action list. This list won't change after being created, hence the
- // indexes will remain constant, allowing a direct lookup.
- final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
- for (int i = 0; i < this.rows.size(); i++) {
- Action<R> action = new Action<R>(this.rows.get(i), i);
- listActions.add(action);
- }
-
- // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
- submit(listActions, false);
-
- // LastRetry is true if, either:
- // we had an exception 'DoNotRetry'
- // we had more than numRetries for any action
- // In this case, we will finish the current retries but we won't start new ones.
- boolean lastRetry = false;
- // If hci.numTries is 1 or 0, we do not retry.
- boolean noRetry = (hci.numTries < 2);
-
- // Analyze and resubmit until all actions are done successfully or failed after numTries
- while (!this.inProgress.isEmpty()) {
- // We need the original multi action to find out what actions to replay if
- // we have a 'total' failure of the Future<MultiResponse>
- // We need the HRegionLocation as we give it back if we go out of retries
- Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
- removeFirstDone();
-
- // Get the answer, keep the exception if any as we will use it for the analysis
- MultiResponse responses = null;
- ExecutionException exception = null;
- try {
- responses = currentTask.getThird().get();
- } catch (ExecutionException e) {
- exception = e;
- }
- HRegionLocation location = currentTask.getSecond();
- // Error case: no result at all for this multi action. We need to redo all actions
- if (responses == null) {
- for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
- for (Action<R> action : actions) {
- Row row = action.getAction();
- // Do not use the exception for updating cache because it might be coming from
- // any of the regions in the MultiAction.
- hci.updateCachedLocations(tableName, row, null, location);
- if (noRetry) {
- errors.add(exception, row, location);
- } else {
- if (isTraceEnabled) {
- retriedErrors.add(exception, row, location);
- }
- lastRetry = addToReplay(nbRetries, action, location);
- }
- }
- }
- } else { // Success or partial success
- // Analyze detailed results. We can still have individual failures to be redo.
- // two specific exceptions are managed:
- // - DoNotRetryIOException: we continue to retry for other actions
- // - RegionMovedException: we update the cache with the new region location
- for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
- responses.getResults().entrySet()) {
- for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
- Action<R> correspondingAction = listActions.get(regionResult.getFirst());
- Object result = regionResult.getSecond();
- this.results[correspondingAction.getOriginalIndex()] = result;
-
- // Failure: retry if it's make sense else update the errors lists
- if (result == null || result instanceof Throwable) {
- Row row = correspondingAction.getAction();
- hci.updateCachedLocations(this.tableName, row, result, location);
- if (result instanceof DoNotRetryIOException || noRetry) {
- errors.add((Exception)result, row, location);
- } else {
- if (isTraceEnabled) {
- retriedErrors.add((Exception)result, row, location);
- }
- lastRetry = addToReplay(nbRetries, correspondingAction, location);
- }
- } else // success
- if (callback != null) {
- this.callback.update(resultsForRS.getKey(),
- this.rows.get(regionResult.getFirst()).getRow(), (R) result);
- }
- }
- }
- }
-
- // Retry all actions in toReplay then clear it.
- if (!noRetry && !toReplay.isEmpty()) {
- if (isTraceEnabled) {
- LOG.trace("Retrying #" + this.curNumRetries +
- (lastRetry ? " (one last time)": "") + " because " +
- retriedErrors.getDescriptionAndClear());
- }
- doRetry();
- if (lastRetry) {
- noRetry = true;
- }
- }
- }
-
- errors.rethrowIfAny();
}
-
- private class BatchErrors {
- private List<Throwable> exceptions = new ArrayList<Throwable>();
- private List<Row> actions = new ArrayList<Row>();
- private List<String> addresses = new ArrayList<String>();
-
- public void add(Exception ex, Row row, HRegionLocation location) {
- exceptions.add(ex);
- actions.add(row);
- addresses.add(location.getHostnamePort());
- }
-
- public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
- if (!exceptions.isEmpty()) {
- throw makeException();
- }
- }
-
- public String getDescriptionAndClear(){
- if (exceptions.isEmpty()) {
- return "";
- }
- String result = makeException().getExhaustiveDescription();
- exceptions.clear();
- actions.clear();
- addresses.clear();
- return result;
- }
-
- private RetriesExhaustedWithDetailsException makeException() {
- return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
- }
- }
-
- /**
- * Put the action that has to be retried in the Replay list.
- * @return true if we're out of numRetries and it's the last retry.
- */
- private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) {
- this.toReplay.add(action);
- nbRetries[action.getOriginalIndex()]++;
- if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
- this.curNumRetries = nbRetries[action.getOriginalIndex()];
- }
- if (hci.isUsingServerTrackerForRetries()) {
- if (this.errorsByServer == null) {
- this.errorsByServer = hci.createServerErrorTracker();
- }
- this.errorsByServer.reportServerError(source);
- return !this.errorsByServer.canRetryMore();
- } else {
- // We need to add 1 to make tries and retries comparable. And as we look for
- // the last try we compare with '>=' and not '>'. And we need curNumRetries
- // to means what it says as we don't want to initialize it to 1.
- return ((this.curNumRetries + 1) >= hci.numTries);
+ @Override
+ public void success(int pos, byte[] region, Row row, Res result) {
+ assert pos < results.length;
+ results[pos] = result;
+ if (callback != null) {
+ callback.update(region, row.getRow(), result);
}
}
- /**
- * Wait for one of tasks to be done, and remove it from the list.
- * @return the tasks done.
- */
- private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
- removeFirstDone() throws InterruptedException {
- while (true) {
- synchronized (finishedTasks) {
- if (!finishedTasks.isEmpty()) {
- MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
-
- // We now need to remove it from the inProgress part.
- Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
- inProgress.iterator();
- while (it.hasNext()) {
- Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
- if (task.getFirst() == done) { // We have the exact object. No java equals here.
- it.remove();
- return task;
- }
- }
- LOG.error("Development error: We didn't see a task in the list. " +
- done.getRegions());
- }
- finishedTasks.wait(10);
- }
- }
+ @Override
+ public boolean failure(int pos, byte[] region, Row row, Throwable t) {
+ assert pos < results.length;
+ results[pos] = t;
+ //Batch.Callback<Res> was not called on failure in 0.94. We keep this.
+ return true; // we want to have this failure in the failures list.
}
- private Callable<MultiResponse> createDelayedCallable(
- final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
-
- final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
-
- return new Callable<MultiResponse>() {
- private final long creationTime = System.currentTimeMillis();
-
- @Override
- public MultiResponse call() throws Exception {
- try {
- final long waitingTime = delay + creationTime - System.currentTimeMillis();
- if (waitingTime > 0) {
- Thread.sleep(waitingTime);
- }
- return delegate.call();
- } finally {
- synchronized (finishedTasks) {
- finishedTasks.add(multi);
- finishedTasks.notifyAll();
- }
- }
- }
- };
+ @Override
+ public boolean retriableFailure(int originalIndex, Row row, byte[] region,
+ Throwable exception) {
+ return true; // we retry
}
}
@@ -2701,102 +2365,89 @@ public class HConnectionManager {
}
throw new TableNotFoundException(Bytes.toString(tableName));
}
+ }
- /**
- * The record of errors for servers. Visible for testing.
- */
- @VisibleForTesting
- static class ServerErrorTracker {
- private final Map<HRegionLocation, ServerErrors> errorsByServer =
- new HashMap<HRegionLocation, ServerErrors>();
- private long canRetryUntil = 0;
+ /**
+ * The record of errors for servers.
+ */
+ static class ServerErrorTracker {
+ // We need a concurrent map here, as we could have multiple threads updating it in parallel.
+ private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
+ new ConcurrentHashMap<HRegionLocation, ServerErrors>();
+ private long canRetryUntil = 0;
- public ServerErrorTracker(long timeout) {
- LOG.trace("Server tracker timeout is " + timeout + "ms");
- this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
- }
+ public ServerErrorTracker(long timeout) {
+ LOG.trace("Server tracker timeout is " + timeout + "ms");
+ this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+ }
- boolean canRetryMore() {
- return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
- }
+ boolean canRetryMore() {
+ return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+ }
- /**
- * Calculates the back-off time for a retrying request to a particular server.
- * This is here, and package private, for testing (no good way to get at it).
- * @param server The server in question.
- * @param basePause The default hci pause.
- * @return The time to wait before sending next request.
- */
- long calculateBackoffTime(HRegionLocation server, long basePause) {
- long result = 0;
- ServerErrors errorStats = errorsByServer.get(server);
- if (errorStats != null) {
- result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
- // Adjust by the time we already waited since last talking to this server.
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long timeSinceLastError = now - errorStats.getLastErrorTime();
- if (timeSinceLastError > 0) {
- result = Math.max(0, result - timeSinceLastError);
- }
- // Finally, see if the backoff time overshoots the timeout.
- if (result > 0 && (now + result > this.canRetryUntil)) {
- result = Math.max(0, this.canRetryUntil - now);
- }
+ /**
+ * Calculates the back-off time for a retrying request to a particular server.
+ *
+ * @param server The server in question.
+ * @param basePause The default hci pause.
+ * @return The time to wait before sending next request.
+ */
+ long calculateBackoffTime(HRegionLocation server, long basePause) {
+ long result = 0;
+ ServerErrors errorStats = errorsByServer.get(server);
+ if (errorStats != null) {
+ result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
+ // Adjust by the time we already waited since last talking to this server.
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long timeSinceLastError = now - errorStats.getLastErrorTime();
+ if (timeSinceLastError > 0) {
+ result = Math.max(0, result - timeSinceLastError);
+ }
+ // Finally, see if the backoff time overshoots the timeout.
+ if (result > 0 && (now + result > this.canRetryUntil)) {
+ result = Math.max(0, this.canRetryUntil - now);
}
- return result;
}
+ return result;
+ }
- /**
- * Reports that there was an error on the server to do whatever bean-counting necessary.
- * This is here, and package private, for testing (no good way to get at it).
- * @param server The server in question.
- */
- void reportServerError(HRegionLocation server) {
- ServerErrors errors = errorsByServer.get(server);
- if (errors != null) {
- errors.addError();
- } else {
- errorsByServer.put(server, new ServerErrors());
- }
+ /**
+ * Reports that there was an error on the server to do whatever bean-counting necessary.
+ *
+ * @param server The server in question.
+ */
+ void reportServerError(HRegionLocation server) {
+ ServerErrors errors = errorsByServer.get(server);
+ if (errors != null) {
+ errors.addError();
+ } else {
+ errorsByServer.put(server, new ServerErrors());
}
+ }
- /**
- * The record of errors for a server.
- */
- private static class ServerErrors {
- public long lastErrorTime;
- public int retries;
-
- public ServerErrors() {
- this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
- this.retries = 0;
- }
+ /**
+ * The record of errors for a server.
+ */
+ private static class ServerErrors {
+ public long lastErrorTime;
+ public int retries;
- public void addError() {
- this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
- ++this.retries;
- }
+ public ServerErrors() {
+ this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+ this.retries = 0;
+ }
- public long getLastErrorTime() {
- return this.lastErrorTime;
- }
+ public void addError() {
+ this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+ ++this.retries;
}
- }
- public boolean isUsingServerTrackerForRetries() {
- return this.useServerTrackerForRetries;
- }
- /**
- * Creates the server error tracker to use inside process.
- * Currently, to preserve the main assumption about current retries, and to work well with
- * the retry-limit-based calculation, the calculation is local per Process object.
- * We may benefit from connection-wide tracking of server errors.
- * @return ServerErrorTracker to use.
- */
- ServerErrorTracker createServerErrorTracker() {
- return new ServerErrorTracker(this.serverTrackerTimeout);
+ public long getLastErrorTime() {
+ return this.lastErrorTime;
+ }
}
}
+
/**
* Set the number of retries to use serverside when trying to communicate
* with another server over {@link HConnection}. Used updating catalog
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Jun 24 18:57:29 2013
@@ -59,6 +59,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -116,14 +117,14 @@ import java.util.concurrent.TimeUnit;
@InterfaceStability.Stable
public class HTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(HTable.class);
- private HConnection connection;
+ protected HConnection connection;
private final byte [] tableName;
private volatile Configuration configuration;
- private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
+ protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
private long writeBufferSize;
private boolean clearBufferOnFail;
private boolean autoFlush;
- private long currentWriteBufferSize;
+ protected long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
private ExecutorService pool; // For Multi
@@ -132,6 +133,9 @@ public class HTable implements HTableInt
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
+ /** The Async process for puts with autoflush set to false or multiputs */
+ protected AsyncProcess<Object> ap;
+
/**
* Creates an object to access a HBase table.
* Shares zookeeper connection and other resources with other HTable instances
@@ -239,6 +243,15 @@ public class HTable implements HTableInt
}
/**
+ * For internal testing.
+ */
+ protected HTable(){
+ tableName = new byte[]{};
+ cleanupPoolOnClose = false;
+ cleanupConnectionOnClose = false;
+ }
+
+ /**
* setup this HTable's parameter based on the passed configuration
*/
private void finishSetup() throws IOException {
@@ -257,11 +270,15 @@ public class HTable implements HTableInt
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration);
+
this.maxKeyValueSize = this.configuration.getInt(
"hbase.client.keyvalue.maxsize", -1);
this.closed = false;
}
+
+
/**
* {@inheritDoc}
*/
@@ -397,6 +414,15 @@ public class HTable implements HTableInt
}
/**
+ * Kept in 0.96 for backward compatibility
+ * @deprecated since 0.96. This is an internal buffer that should not be read nor write.
+ */
+ @Deprecated
+ public List<Row> getWriteBuffer() {
+ return writeAsyncBuffer;
+ }
+
+ /**
* Sets the number of rows that a scanner will fetch at once.
* <p>
* This will override the value specified by
@@ -647,21 +673,19 @@ public class HTable implements HTableInt
@Override
public void batch(final List<?extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
- connection.processBatchCallback(actions, tableName, pool, results, null);
+ batchCallback(actions, results, null);
}
@Override
public Object[] batch(final List<? extends Row> actions)
throws InterruptedException, IOException {
- Object[] results = new Object[actions.size()];
- connection.processBatchCallback(actions, tableName, pool, results, null);
- return results;
+ return batchCallback(actions, null);
}
@Override
public <R> void batchCallback(
- final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
- throws IOException, InterruptedException {
+ final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+ throws IOException, InterruptedException {
connection.processBatchCallback(actions, tableName, pool, results, callback);
}
@@ -670,7 +694,7 @@ public class HTable implements HTableInt
final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
InterruptedException {
Object[] results = new Object[actions.size()];
- connection.processBatchCallback(actions, tableName, pool, results, callback);
+ batchCallback(actions, results, callback);
return results;
}
@@ -702,7 +726,7 @@ public class HTable implements HTableInt
throws IOException {
Object[] results = new Object[deletes.size()];
try {
- connection.processBatch((List) deletes, tableName, pool, results);
+ batch(deletes, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
@@ -722,7 +746,8 @@ public class HTable implements HTableInt
* {@inheritDoc}
*/
@Override
- public void put(final Put put) throws IOException {
+ public void put(final Put put)
+ throws InterruptedIOException, RetriesExhaustedWithDetailsException {
doPut(put);
if (autoFlush) {
flushCommits();
@@ -733,7 +758,8 @@ public class HTable implements HTableInt
* {@inheritDoc}
*/
@Override
- public void put(final List<Put> puts) throws IOException {
+ public void put(final List<Put> puts)
+ throws InterruptedIOException, RetriesExhaustedWithDetailsException {
for (Put put : puts) {
doPut(put);
}
@@ -742,12 +768,64 @@ public class HTable implements HTableInt
}
}
- private void doPut(Put put) throws IOException{
+
+ /**
+ * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
+ * cluster.
+ * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
+ * @throws InterruptedIOException if we were interrupted.
+ */
+ private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ if (ap.hasError()){
+ backgroundFlushCommits(true);
+ }
+
validatePut(put);
- writeBuffer.add(put);
+
currentWriteBufferSize += put.heapSize();
- if (currentWriteBufferSize > writeBufferSize) {
- flushCommits();
+ writeAsyncBuffer.add(put);
+
+ while (currentWriteBufferSize > writeBufferSize) {
+ backgroundFlushCommits(false);
+ }
+ }
+
+
+ /**
+ * Send the operations in the buffer to the servers. Does not wait for the server's answer.
+ * If the is an error (max retried reach from a previous flush or bad operation), it tries to
+ * send all operations in the buffer and sends an exception.
+ */
+ private void backgroundFlushCommits(boolean synchronous) throws
+ InterruptedIOException, RetriesExhaustedWithDetailsException {
+
+ try {
+ // If there is an error on the operations in progress, we don't add new operations.
+ if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
+ ap.submit(writeAsyncBuffer, true);
+ }
+
+ if (synchronous || ap.hasError()) {
+ ap.waitUntilDone();
+ }
+
+ if (ap.hasError()) {
+ if (!clearBufferOnFail) {
+ // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
+ // write buffer. This is a questionable feature kept here for backward compatibility
+ writeAsyncBuffer.addAll(ap.getFailedOperations());
+ }
+ RetriesExhaustedWithDetailsException e = ap.getErrors();
+ ap.clearErrors();
+ throw e;
+ }
+ } finally {
+ currentWriteBufferSize = 0;
+ for (Row mut : writeAsyncBuffer) {
+ if (mut instanceof Mutation) {
+ currentWriteBufferSize += ((Mutation) mut).heapSize();
+ }
+ }
}
}
@@ -1080,37 +1158,13 @@ public class HTable implements HTableInt
* {@inheritDoc}
*/
@Override
- public void flushCommits() throws IOException {
- if (writeBuffer.isEmpty()){
- // Early exit: we can be called on empty buffers.
- return;
- }
-
- Object[] results = new Object[writeBuffer.size()];
- boolean success = false;
- try {
- this.connection.processBatch(writeBuffer, tableName, pool, results);
- success = true;
- } catch (InterruptedException e) {
- throw new InterruptedIOException(e.getMessage());
- } finally {
- // mutate list so that it is empty for complete success, or contains
- // only failed records. Results are returned in the same order as the
- // requests in list. Walk the list backwards, so we can remove from list
- // without impacting the indexes of earlier members
- currentWriteBufferSize = 0;
- if (success || clearBufferOnFail) {
- writeBuffer.clear();
- } else {
- for (int i = results.length - 1; i >= 0; i--) {
- if (results[i] instanceof Result) {
- writeBuffer.remove(i);
- } else {
- currentWriteBufferSize += writeBuffer.get(i).heapSize();
- }
- }
- }
- }
+ public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ // We're looping, as if one region is overloaded we keep its operations in the buffer.
+ // As we can have an operation in progress even if the buffer is empty, we call
+ // backgroundFlushCommits at least one time.
+ do {
+ backgroundFlushCommits(true);
+ } while (!writeAsyncBuffer.isEmpty());
}
/**
@@ -1127,7 +1181,7 @@ public class HTable implements HTableInt
public <R> void processBatchCallback(
final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
- connection.processBatchCallback(list, tableName, pool, results, callback);
+ this.batchCallback(list, results, callback);
}
@@ -1254,14 +1308,6 @@ public class HTable implements HTableInt
}
/**
- * Returns the write buffer.
- * @return The current write buffer.
- */
- public ArrayList<Put> getWriteBuffer() {
- return writeBuffer;
- }
-
- /**
* The pool is used for mutli requests for this HTable
* @return the pool used for mutli
*/
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java Mon Jun 24 18:57:29 2013
@@ -48,11 +48,11 @@ import java.util.Set;
public class RetriesExhaustedWithDetailsException
extends RetriesExhaustedException {
List<Throwable> exceptions;
- List<Row> actions;
+ List<? extends Row> actions;
List<String> hostnameAndPort;
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
- List<Row> actions,
+ List<? extends Row> actions,
List<String> hostnameAndPort) {
super("Failed " + exceptions.size() + " action" +
pluralize(exceptions) + ": " +
@@ -105,7 +105,7 @@ extends RetriesExhaustedException {
}
public static String getDesc(List<Throwable> exceptions,
- List<Row> actions,
+ List<? extends Row> actions,
List<String> hostnamePort) {
String s = getDesc(classifyExs(exceptions));
StringBuilder addrs = new StringBuilder(s);
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Mon Jun 24 18:57:29 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
@@ -187,6 +188,8 @@ public abstract class ServerCallable<T>
// map to that slow/dead server; otherwise, let cache miss and ask
// .META. again to find the new location
getConnection().clearCaches(location.getServerName());
+ } else if (t instanceof RegionMovedException) {
+ getConnection().updateCachedLocations(tableName, row, t, location);
} else if (t instanceof NotServingRegionException && numRetries == 1) {
// Purge cache entries for this specific region from META cache
// since we don't call connect(true) when number of retries is 1.