You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/06/24 20:57:29 UTC

svn commit: r1496159 [1/2] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/sr...

Author: nkeywal
Date: Mon Jun 24 18:57:29 2013
New Revision: 1496159

URL: http://svn.apache.org/r1496159
Log:
HBASE-6295  Possible performance improvement in client batch operations: presplit and send in background

Added:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java Mon Jun 24 18:57:29 2013
@@ -32,7 +32,6 @@ public class Action<R> implements Compar
   // TODO: This class should not be visible outside of the client package.
   private Row action;
   private int originalIndex;
-  private R result;
 
   public Action(Row action, int originalIndex) {
     super();
@@ -40,13 +39,6 @@ public class Action<R> implements Compar
     this.originalIndex = originalIndex;    
   }
 
-  public R getResult() {
-    return result;
-  }
-
-  public void setResult(R result) {
-    this.result = result;
-  }
 
   public Row getAction() {
     return action;

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1496159&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Mon Jun 24 18:57:29 2013
@@ -0,0 +1,728 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class  allows a continuous flow of requests. It's written to be compatible with a
+ * synchronous caller such as HTable.
+ * <p>
+ * The caller sends a buffer of operation, by calling submit. This class extract from this list
+ * the operations it can send, i.e. the operations that are on region that are not considered
+ * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
+ * iterate on the list. If, and only if, the maximum number of current task is reached, the call
+ * to submit will block.
+ * </p>
+ * <p>
+ * The class manages internally the retries.
+ * </p>
+ * <p>
+ * The class includes an error marker: it allows to know if an operation has failed or not, and
+ * to get the exception details, i.e. the full list of throwables for each attempt. This marker
+ * is here to help the backward compatibility in HTable. In most (new) cases, it should be
+ * managed by the callbacks.
+ * </p>
+ * <p>
+ * A callback is available, in order to: <list>
+ * <li>Get the result of the operation (failure or success)</li>
+ * <li>When an operation fails but could be retried, allows or not to retry</li>
+ * <li>When an operation fails for good (can't be retried or already retried the maximum number
+ * time), register the error or not.
+ * </list>
+ * <p>
+ * This class is not thread safe externally; only one thread should submit operations at a time.
+ * Internally, the class is thread safe enough to manage simultaneously new submission and results
+ * arising from older operations.
+ * </p>
+ * <p>
+ * Internally, this class works with {@link Row}, this mean it could be theoretically used for
+ * gets as well.
+ * </p>
+ */
+class AsyncProcess<CResult> {
+  private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+
+  protected final HConnection hConnection;
+  protected final byte[] tableName;
+  protected final ExecutorService pool;
+  protected final AsyncProcessCallback<CResult> callback;
+  protected final BatchErrors errors = new BatchErrors();
+  protected final BatchErrors retriedErrors = new BatchErrors();
+  protected final AtomicBoolean hasError = new AtomicBoolean(false);
+  protected final AtomicLong tasksSent = new AtomicLong(0);
+  protected final AtomicLong tasksDone = new AtomicLong(0);
+  protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
+      new ConcurrentHashMap<String, AtomicInteger>();
+  protected final int maxTotalConcurrentTasks;
+  protected final int maxConcurrentTasksPerRegion;
+  protected final long pause;
+  protected int numTries;
+  protected final boolean useServerTrackerForRetries;
+  protected int serverTrackerTimeout;
+
+
+  /**
+   * This interface allows to keep the interface of the previous synchronous interface, that uses
+   * an array of object to return the result.
+   * <p/>
+   * This interface allows the caller to specify the behavior on errors: <list>
+   * <li>If we have not yet reach the maximum number of retries, the user can nevertheless
+   * specify if this specific operation should be retried or not.
+   * </li>
+   * <li>If an operation fails (i.e. is not retried or fails after all retries), the user can
+   * specify is we should mark this AsyncProcess as in error or not.
+   * </li>
+   * </list>
+   */
+  static interface AsyncProcessCallback<CResult> {
+
+    /**
+     * Called on success. originalIndex holds the index in the action list.
+     */
+    void success(int originalIndex, byte[] region, Row row, CResult result);
+
+    /**
+     * called on failure, if we don't retry (i.e. called once per failed operation).
+     *
+     * @return true if we should store the error and tag this async process as being in error.
+     *         false if the failure of this operation can be safely ignored, and does not require
+     *         the current process to be stopped without proceeding with the other operations in
+     *         the queue.
+     */
+    boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
+
+    /**
+     * Called on a failure we plan to retry. This allows the user to stop retrying. Will be
+     * called multiple times for a single action if it fails multiple times.
+     *
+     * @return false if we should retry, true otherwise.
+     */
+    boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
+  }
+
+  private static class BatchErrors {
+    private List<Throwable> throwables = new ArrayList<Throwable>();
+    private List<Row> actions = new ArrayList<Row>();
+    private List<String> addresses = new ArrayList<String>();
+
+    public void add(Throwable ex, Row row, HRegionLocation location) {
+      throwables.add(ex);
+      actions.add(row);
+      addresses.add(location != null ? location.getHostnamePort() : "null location");
+    }
+
+    private RetriesExhaustedWithDetailsException makeException() {
+      return new RetriesExhaustedWithDetailsException(
+          new ArrayList<Throwable>(throwables),
+          new ArrayList<Row>(actions), new ArrayList<String>(addresses));
+    }
+
+    public void clear() {
+      throwables.clear();
+      actions.clear();
+      addresses.clear();
+    }
+  }
+
+  public AsyncProcess(HConnection hc, byte[] tableName, ExecutorService pool,
+                      AsyncProcessCallback<CResult> callback, Configuration conf) {
+    this.hConnection = hc;
+    this.tableName = tableName;
+    this.pool = pool;
+    this.callback = callback;
+
+    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+    this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+
+    this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 200);
+
+    // With one, we ensure that the ordering of the queries is respected: we don't start
+    //  a set of operations on a region before the previous one is done. As well, this limits
+    //  the pressure we put on the region server.
+    this.maxConcurrentTasksPerRegion = conf.getInt("hbase.client.max.perregion.tasks", 1);
+
+    this.useServerTrackerForRetries =
+        conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
+
+    if (this.useServerTrackerForRetries) {
+      // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+      // However, if we are too useful, we might fail very quickly due to retry count limit.
+      // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+      // retry time if normal retries were used. Then we will retry until this time runs out.
+      // If we keep hitting one server, the net effect will be the incremental backoff, and
+      // essentially the same number of retries as planned. If we have to do faster retries,
+      // we will do more retries in aggregate, but the user will be none the wiser.
+      this.serverTrackerTimeout = 0;
+      for (int i = 0; i < this.numTries; ++i) {
+        serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
+      }
+    }
+  }
+
+  /**
+   * Extract from the rows list what we can submit. The rows we can not submit are kept in the
+   * list.
+   *
+   * @param rows - the submitted row. Modified by the method: we remove the rows we took.
+   * @param atLeastOne true if we should submit at least a subset.
+   */
+  public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
+    if (rows.isEmpty()){
+      return;
+    }
+
+    Map<HRegionLocation, MultiAction<Row>> actionsByServer =
+        new HashMap<HRegionLocation, MultiAction<Row>>();
+    List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
+
+    do {
+      Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+      long currentTaskNumber = waitForMaximumCurrentTasks(maxTotalConcurrentTasks);
+      int posInList = -1;
+      Iterator<? extends Row> it = rows.iterator();
+      while (it.hasNext()) {
+        Row r = it.next();
+        HRegionLocation loc = findDestLocation(r, 1, posInList, false, regionIncluded);
+
+        if (loc != null) {   // loc is null if the dest is too busy or there is an error
+          Action<Row> action = new Action<Row>(r, ++posInList);
+          retainedActions.add(action);
+          addAction(loc, action, actionsByServer);
+          it.remove();
+        }
+      }
+
+      if (retainedActions.isEmpty() && atLeastOne && !hasError()) {
+        waitForNextTaskDone(currentTaskNumber);
+      }
+
+    } while (retainedActions.isEmpty() && atLeastOne && !hasError());
+
+    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
+    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
+  }
+
+  /**
+   * Group the actions per region server.
+   *
+   * @param loc - the destination. Must not be null.
+   * @param action - the action to add to the multiaction
+   * @param actionsByServer the multiaction per server
+   */
+  private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
+      MultiAction<Row>> actionsByServer) {
+    final byte[] regionName = loc.getRegionInfo().getRegionName();
+    MultiAction<Row> multiAction = actionsByServer.get(loc);
+    if (multiAction == null) {
+      multiAction = new MultiAction<Row>();
+      actionsByServer.put(loc, multiAction);
+    }
+
+    multiAction.add(regionName, action);
+  }
+
+  /**
+   * Find the destination, if this destination is not considered as busy.
+   *
+   * @param row          the row
+   * @param numAttempt   the num attempt
+   * @param posInList    the position in the list
+   * @param force        if we must submit whatever the server load
+   * @param regionStatus the
+   * @return null if we should not submit, the destination otherwise.
+   */
+  private HRegionLocation findDestLocation(Row row, int numAttempt,
+                                           int posInList, boolean force,
+                                           Map<String, Boolean> regionStatus) {
+    HRegionLocation loc = null;
+    IOException locationException = null;
+    try {
+      loc = hConnection.locateRegion(this.tableName, row.getRow());
+      if (loc == null) {
+        locationException = new IOException("No location found, aborting submit for" +
+            " tableName=" + Bytes.toString(tableName) +
+            " rowkey=" + Arrays.toString(row.getRow()));
+      }
+    } catch (IOException e) {
+      locationException = e;
+    }
+    if (locationException != null) {
+      // There are multiple retries in locateRegion already. No need to add new.
+      // We can't continue with this row, hence it's the last retry.
+      manageError(numAttempt, posInList, row, false, locationException, null);
+      return null;
+    }
+
+    if (force) {
+      return loc;
+    }
+
+    String regionName = loc.getRegionInfo().getEncodedName();
+    Boolean addIt = regionStatus.get(regionName);
+    if (addIt == null) {
+      addIt = canTakeNewOperations(regionName);
+      regionStatus.put(regionName, addIt);
+    }
+
+    return addIt ? loc : null;
+  }
+
+
+  /**
+   * Check if we should send new operations to this region.
+   *
+   * @param encodedRegionName region name
+   * @return true if this region is considered as busy.
+   */
+  protected boolean canTakeNewOperations(String encodedRegionName) {
+    AtomicInteger ct = taskCounterPerRegion.get(encodedRegionName);
+    return ct == null || ct.get() < maxConcurrentTasksPerRegion;
+  }
+
+  /**
+   * Submit immediately the list of rows, whatever the server status. Kept for backward
+   * compatibility: it allows to be used with the batch interface that return an array of objects.
+   *
+   * @param rows the list of rows.
+   */
+  public void submitAll(List<? extends Row> rows) {
+    List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
+
+    // The position will be used by the processBatch to match the object array returned.
+    int posInList = -1;
+    for (Row r : rows) {
+      posInList++;
+      Action<Row> action = new Action<Row>(r, posInList);
+      actions.add(action);
+    }
+    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
+    submit(actions, actions, 1, true, errorsByServer);
+  }
+
+
+  /**
+   * Group a list of actions per region servers, and send them. The created MultiActions are
+   * added to the inProgress list.
+   *
+   * @param initialActions - the full list of the actions in progress
+   * @param currentActions - the list of row to submit
+   * @param numAttempt - the current numAttempt (first attempt is 1)
+   * @param force - true if we submit the rowList without taking into account the server load
+   */
+  private void submit(List<Action<Row>> initialActions,
+                      List<Action<Row>> currentActions, int numAttempt, boolean force,
+                      final HConnectionManager.ServerErrorTracker errorsByServer) {
+    // group per location => regions server
+    final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
+        new HashMap<HRegionLocation, MultiAction<Row>>();
+
+    // We have the same policy for a single region per call to submit: we don't want
+    //  to send half of the actions because the status changed in the middle. So we keep the
+    //  status
+    Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
+
+    for (Action<Row> action : currentActions) {
+      HRegionLocation loc = findDestLocation(
+          action.getAction(), 1, action.getOriginalIndex(), force, regionIncluded);
+
+      if (loc != null) {
+        addAction(loc, action, actionsByServer);
+      }
+    }
+
+    if (!actionsByServer.isEmpty()) {
+      sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer);
+    }
+  }
+
+  /**
+   * Send a multi action structure to the servers, after a delay depending on the attempt
+   * number. Asynchronous.
+   *
+   * @param initialActions  the list of the actions, flat.
+   * @param actionsByServer the actions structured by regions
+   * @param numAttempt      the attempt number.
+   */
+  public void sendMultiAction(final List<Action<Row>> initialActions,
+                              Map<HRegionLocation, MultiAction<Row>> actionsByServer,
+                              final int numAttempt,
+                              final HConnectionManager.ServerErrorTracker errorsByServer) {
+
+    // Send the queries and add them to the inProgress list
+    for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
+      final HRegionLocation loc = e.getKey();
+      final MultiAction<Row> multi = e.getValue();
+      final String regionName = loc.getRegionInfo().getEncodedName();
+
+      incTaskCounters(regionName);
+
+      Runnable runnable = new Runnable() {
+        @Override
+        public void run() {
+          MultiResponse res;
+          try {
+            ServerCallable<MultiResponse> callable = createCallable(loc, multi);
+            try {
+              res = callable.withoutRetries();
+            } catch (IOException e) {
+              LOG.warn("The call to the RS failed, we don't know where we stand. regionName="
+                  + regionName, e);
+              resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
+              return;
+            }
+
+            receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
+          } finally {
+            decTaskCounters(regionName);
+          }
+        }
+      };
+
+      try {
+        this.pool.submit(runnable);
+      } catch (RejectedExecutionException ree) {
+        // This should never happen. But as the pool is provided by the end user, let's secure
+        //  this a little.
+        decTaskCounters(regionName);
+        LOG.warn("The task was rejected by the pool. This is unexpected. " +
+            "regionName=" + regionName, ree);
+        // We're likely to fail again, but this will increment the attempt counter, so it will
+        //  finish.
+        resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
+      }
+    }
+  }
+
+  /**
+   * Create a callable. Isolated to be easily overridden in the tests.
+   */
+  protected ServerCallable<MultiResponse> createCallable(
+      final HRegionLocation loc, final MultiAction<Row> multi) {
+
+    return new MultiServerCallable<Row>(hConnection, tableName, loc, multi);
+  }
+
+  /**
+   * Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
+   *
+   * @param numAttempt    the number of this attempt
+   * @param originalIndex the position in the list sent
+   * @param row           the row
+   * @param canRetry      if false, we won't retry whatever the settings.
+   * @param throwable     the throwable, if any (can be null)
+   * @param location      the location, if any (can be null)
+   * @return true if the action can be retried, false otherwise.
+   */
+  private boolean manageError(int numAttempt, int originalIndex, Row row, boolean canRetry,
+                              Throwable throwable, HRegionLocation location) {
+    if (canRetry) {
+      if (numAttempt >= numTries ||
+          (throwable != null && throwable instanceof DoNotRetryIOException)) {
+        canRetry = false;
+      }
+    }
+    byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
+
+    if (canRetry && callback != null) {
+      canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
+    }
+
+    if (canRetry) {
+      if (LOG.isTraceEnabled()) {
+        retriedErrors.add(throwable, row, location);
+      }
+    } else {
+      if (callback != null) {
+        callback.failure(originalIndex, region, row, throwable);
+      }
+      this.hasError.set(true);
+      errors.add(throwable, row, location);
+    }
+
+    return canRetry;
+  }
+
+  /**
+   * Resubmit all the actions from this multiaction after a failure.
+   *
+   * @param initialActions the full initial action list
+   * @param rsActions  the actions still to do from the initial list
+   * @param location   the destination
+   * @param numAttempt the number of attempts so far
+   * @param t the throwable (if any) that caused the resubmit
+   */
+  private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
+                           HRegionLocation location, int numAttempt, Throwable t,
+                           HConnectionManager.ServerErrorTracker errorsByServer) {
+    // Do not use the exception for updating cache because it might be coming from
+    // any of the regions in the MultiAction.
+    hConnection.updateCachedLocations(tableName,
+        rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
+    errorsByServer.reportServerError(location);
+
+    List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
+    for (List<Action<Row>> actions : rsActions.actions.values()) {
+      for (Action<Row> action : actions) {
+        if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
+            true, t, location)) {
+          toReplay.add(action);
+        }
+      }
+    }
+
+    submit(initialActions, toReplay, numAttempt, true, errorsByServer);
+  }
+
+  /**
+   * Called when we receive the result of a server query.
+   *
+   * @param initialActions - the whole action list
+   * @param rsActions      - the actions for this location
+   * @param location       - the location
+   * @param responses      - the response, if any
+   * @param numAttempt     - the attempt
+   */
+  private void receiveMultiAction(List<Action<Row>> initialActions,
+                                  MultiAction<Row> rsActions, HRegionLocation location,
+                                  MultiResponse responses, int numAttempt,
+                                  HConnectionManager.ServerErrorTracker errorsByServer) {
+
+    if (responses == null) {
+      LOG.info("Attempt #" + numAttempt + " failed for all operations on server " +
+          location.getServerName() + " , trying to resubmit.");
+      resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
+      return;
+    }
+
+    // Success or partial success
+    // Analyze detailed results. We can still have individual failures to be redo.
+    // two specific throwables are managed:
+    //  - DoNotRetryIOException: we continue to retry for other actions
+    //  - RegionMovedException: we update the cache with the new region location
+
+    List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
+    Throwable throwable = null;
+
+    int failureCount = 0;
+    boolean canRetry = true;
+    for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
+        responses.getResults().entrySet()) {
+
+      for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
+        Object result = regionResult.getSecond();
+
+        // Failure: retry if it's make sense else update the errors lists
+        if (result == null || result instanceof Throwable) {
+          throwable = (Throwable) result;
+          Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+          Row row = correspondingAction.getAction();
+
+          if (failureCount++ == 0) { // We're doing this once per location.
+            hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
+            if (errorsByServer != null) {
+              errorsByServer.reportServerError(location);
+              canRetry = errorsByServer.canRetryMore();
+            }
+          }
+
+          if (manageError(numAttempt, correspondingAction.getOriginalIndex(), row, canRetry,
+              throwable, location)) {
+            toReplay.add(correspondingAction);
+          }
+        } else { // success
+          if (callback != null) {
+            Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+            Row row = correspondingAction.getAction();
+            //noinspection unchecked
+            this.callback.success(correspondingAction.getOriginalIndex(),
+                resultsForRS.getKey(), row, (CResult) result);
+          }
+        }
+      }
+    }
+
+    if (!toReplay.isEmpty()) {
+      if (numAttempt > 2) {
+        // We use this value to have some logs when we have multiple failures, but not too many
+        //  logs as errors are to be expected wehn region moves, split and so on
+        LOG.debug("Attempt #" + numAttempt + " failed for " + failureCount +
+            " operations on server " + location.getServerName() + ", resubmitting " +
+            toReplay.size() + ", tableName=" + Bytes.toString(tableName) +
+            ", last exception was: " + throwable);
+      }
+      long backOffTime = (errorsByServer != null ?
+          errorsByServer.calculateBackoffTime(location, pause) :
+          ConnectionUtils.getPauseTime(pause, numAttempt));
+      try {
+        Thread.sleep(backOffTime);
+      } catch (InterruptedException e) {
+        LOG.warn("Not sent: " + toReplay.size() +
+            " operations,  tableName=" + Bytes.toString(tableName), e);
+        Thread.interrupted();
+        return;
+      }
+
+      submit(initialActions, toReplay, numAttempt + 1, true, errorsByServer);
+    } else if (failureCount != 0) {
+      LOG.warn("Attempt #" + numAttempt + " failed for " + failureCount +
+          " operations on server " + location.getServerName() + " NOT resubmitting." +
+          ", tableName=" + Bytes.toString(tableName));
+    }
+  }
+
+  /**
+   * Waits for another task to finish.
+   * @param currentNumberOfTask - the number of task finished when calling the method.
+   */
+  protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
+    while (currentNumberOfTask == tasksDone.get()) {
+      try {
+        synchronized (this.tasksDone) {
+          this.tasksDone.wait(100);
+        }
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("Interrupted." +
+            " currentNumberOfTask=" + currentNumberOfTask +
+            ",  tableName=" + Bytes.toString(tableName) + ", tasksDone=" + tasksDone.get());
+      }
+    }
+  }
+
+  /**
+   * Wait until the async does not have more than max tasks in progress.
+   */
+  private long waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
+    long lastLog = 0;
+    long currentTasksDone = this.tasksDone.get();
+
+    while ((tasksSent.get() - currentTasksDone) > max) {
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      if (now > lastLog + 5000) {
+        lastLog = now;
+        LOG.info(Bytes.toString(tableName) +
+            ": Waiting for the global number of tasks to be equals or less than " + max +
+            ", currently it's " + this.tasksDone.get());
+      }
+      waitForNextTaskDone(currentTasksDone);
+      currentTasksDone = this.tasksDone.get();
+    }
+
+    return currentTasksDone;
+  }
+
+  /**
+   * Wait until all tasks are executed, successfully or not.
+   */
+  public void waitUntilDone() throws InterruptedIOException {
+    waitForMaximumCurrentTasks(0);
+  }
+
+
+  public boolean hasError() {
+    return hasError.get();
+  }
+
+  public List<? extends Row> getFailedOperations() {
+    return errors.actions;
+  }
+
+  /**
+   * Clean the errors stacks. Should be called only when there are no actions in progress.
+   */
+  public void clearErrors() {
+    errors.clear();
+    retriedErrors.clear();
+    hasError.set(false);
+  }
+
+  public RetriesExhaustedWithDetailsException getErrors() {
+    return errors.makeException();
+  }
+
+  /**
+   * incrementer the tasks counters for a given region. MT safe.
+   */
+  protected void incTaskCounters(String encodedRegionName) {
+    tasksSent.incrementAndGet();
+
+    AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+    if (counterPerServer == null) {
+      taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
+      counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+    }
+    counterPerServer.incrementAndGet();
+  }
+
+  /**
+   * Decrements the counters for a given region
+   */
+  protected void decTaskCounters(String encodedRegionName) {
+    AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+    counterPerServer.decrementAndGet();
+
+    tasksDone.incrementAndGet();
+    synchronized (tasksDone) {
+      tasksDone.notifyAll();
+    }
+  }
+
+  /**
+   * Creates the server error tracker to use inside process.
+   * Currently, to preserve the main assumption about current retries, and to work well with
+   * the retry-limit-based calculation, the calculation is local per Process object.
+   * We may benefit from connection-wide tracking of server errors.
+   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
+   */
+  protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
+    if (useServerTrackerForRetries){
+      return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout);
+    }else {
+      return null;
+    }
+  }
+}

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Mon Jun 24 18:57:29 2013
@@ -172,6 +172,17 @@ public interface HConnection extends Abo
   throws IOException;
 
   /**
+   * Update the location cache. This is used internally by HBase, in most cases it should not be
+   *  used by the client application.
+   * @param tableName the table name
+   * @param rowkey the row
+   * @param exception the exception if any. Can be null.
+   * @param source the previous location
+   */
+  public void updateCachedLocations(byte[] tableName, byte[] rowkey,
+                                    Object exception, HRegionLocation source);
+
+  /**
    * Gets the location of the region of <i>regionName</i>.
    * @param regionName name of the region to locate
    * @return HRegionLocation that describes where to find the region in
@@ -354,7 +365,7 @@ public interface HConnection extends Abo
 
 
   /**
-   * Clear any caches that pertain to server name <code>sn</code>
+   * Clear any caches that pertain to server name <code>sn</code>.
    * @param sn A server name
    */
   public void clearCaches(final ServerName sn);

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Jun 24 18:57:29 2013
@@ -26,20 +26,16 @@ import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -141,16 +137,13 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
-import org.apache.hadoop.hbase.util.Triple;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -440,8 +433,6 @@ public class HConnectionManager {
     private final int numTries;
     final int rpcTimeout;
     private final int prefetchRegionLimit;
-    private final boolean useServerTrackerForRetries;
-    private final long serverTrackerTimeout;
 
     private volatile boolean closed;
     private volatile boolean aborted;
@@ -490,12 +481,12 @@ public class HConnectionManager {
     private int refCount;
 
     // indicates whether this connection's life cycle is managed (by us)
-    private final boolean managed;
+    private boolean managed;
 
     /**
      * Cluster registry of basic info such as clusterid and meta region location.
      */
-    final Registry registry;
+     Registry registry;
 
     /**
      * constructor
@@ -509,34 +500,8 @@ public class HConnectionManager {
      * users of an HConnectionImplementation instance.
      */
     HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
-      this.conf = conf;
+      this(conf);
       this.managed = managed;
-      this.closed = false;
-      this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-      this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-      this.rpcTimeout = conf.getInt(
-        HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-      this.prefetchRegionLimit = conf.getInt(
-        HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
-        HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
-      this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true);
-      long serverTrackerTimeout = 0;
-      if (this.useServerTrackerForRetries) {
-        // Server tracker allows us to do faster, and yet useful (hopefully), retries.
-        // However, if we are too useful, we might fail very quickly due to retry count limit.
-        // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
-        // retry time if normal retries were used. Then we will retry until this time runs out.
-        // If we keep hitting one server, the net effect will be the incremental backoff, and
-        // essentially the same number of retries as planned. If we have to do faster retries,
-        // we will do more retries in aggregate, but the user will be none the wiser.
-        for (int i = 0; i < this.numTries; ++i) {
-          serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
-        }
-      }
-      this.serverTrackerTimeout = serverTrackerTimeout;
       this.registry = setupRegistry();
       retrieveClusterId();
 
@@ -560,6 +525,24 @@ public class HConnectionManager {
             }, conf, listenerClass);
       }
     }
+
+    /**
+     * For tests.
+     */
+    protected HConnectionImplementation(Configuration conf) {
+      this.conf = conf;
+      this.closed = false;
+      this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      this.rpcTimeout = conf.getInt(
+          HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+      this.prefetchRegionLimit = conf.getInt(
+          HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+          HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+    }
  
     /**
      * @return The cluster registry implementation to use.
@@ -1982,25 +1965,8 @@ public class HConnectionManager {
       return callable.withoutRetries();
     }
 
-    @Deprecated
-    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
-        final MultiAction<R> multi, final byte[] tableName) {
-      // TODO: This does not belong in here!!! St.Ack HConnections should
-      // not be dealing in Callables; Callables have HConnections, not other
-      // way around.
-      final HConnection connection = this;
-      return new Callable<MultiResponse>() {
-        @Override
-        public MultiResponse call() throws Exception {
-          ServerCallable<MultiResponse> callable =
-            new MultiServerCallable<R>(connection, tableName, loc, multi);
-          return callable.withoutRetries();
-        }
-      };
-   }
-
-   void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
-       ServerName serverName, long seqNum) {
+    void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
+                              ServerName serverName, long seqNum) {
       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
       synchronized (this.cachedRegionLocations) {
         cacheLocation(hri.getTableName(), source, newHrl);
@@ -2058,16 +2024,17 @@ public class HConnectionManager {
      *                  or wrapped or both RegionMovedException
      * @param source server that is the source of the location update.
      */
-    private void updateCachedLocations(final byte[] tableName, Row row,
+    @Override
+    public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
       final Object exception, final HRegionLocation source) {
-      if (row == null || tableName == null) {
-        LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
+      if (rowkey == null || tableName == null) {
+        LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
             ", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
         return;
       }
 
       // Is it something we have already updated?
-      final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
+      final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
       if (oldLocation == null) {
         // There is no such location in the cache => it's been removed already => nothing to do
         return;
@@ -2125,365 +2092,62 @@ public class HConnectionManager {
       Batch.Callback<R> callback)
       throws IOException, InterruptedException {
 
-      Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
-      p.processBatchCallback();
+      // To fulfill the original contract, we have a special callback. This callback
+      //  will set the results in the Object array.
+      ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
+      AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
+
+      // We're doing a submit all. This way, the originalIndex will match the initial list.
+      asyncProcess.submitAll(list);
+      asyncProcess.waitUntilDone();
+
+      if (asyncProcess.hasError()) {
+        throw asyncProcess.getErrors();
+      }
+    }
+
+    // For tests.
+    protected <R> AsyncProcess createAsyncProcess(byte[] tableName, ExecutorService pool,
+           AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
+      return new AsyncProcess<R>(this, tableName, pool, callback, conf);
     }
 
 
     /**
-     * Methods and attributes to manage a batch process are grouped into this single class.
-     * This allows, by creating a Process<R> per batch process to ensure multithread safety.
-     *
-     * This code should be move to HTable once processBatchCallback is not supported anymore in
-     * the HConnection interface.
+     * Fill the result array for the interfaces using it.
      */
-    private static class Process<R> {
-      // Info on the queries and their context
-      private final HConnectionImplementation hci;
-      private final List<? extends Row> rows;
-      private final byte[] tableName;
-      private final ExecutorService pool;
+    private static class ObjectResultFiller<Res>
+        implements AsyncProcess.AsyncProcessCallback<Res> {
+
       private final Object[] results;
-      private final Batch.Callback<R> callback;
+      private Batch.Callback<Res> callback;
 
-      // Used during the batch process
-      private final List<Action<R>> toReplay;
-      private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
-        inProgress;
-
-      private ServerErrorTracker errorsByServer = null;
-      private int curNumRetries;
-
-      // Notified when a tasks is done
-      private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
-
-      private Process(HConnectionImplementation hci, List<? extends Row> list,
-                       byte[] tableName, ExecutorService pool, Object[] results,
-                       Batch.Callback<R> callback){
-        this.hci = hci;
-        this.rows = list;
-        this.tableName = tableName;
-        this.pool = pool;
+      ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
         this.results = results;
         this.callback = callback;
-        this.toReplay = new ArrayList<Action<R>>();
-        this.inProgress =
-          new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
-        this.curNumRetries = 0;
-      }
-
-
-      /**
-       * Group a list of actions per region servers, and send them. The created MultiActions are
-       *  added to the inProgress list.
-       * @param actionsList
-       * @param isRetry Whether we are retrying these actions. If yes, backoff
-       *                time may be applied before new requests.
-       * @throws IOException - if we can't locate a region after multiple retries.
-       */
-      private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException {
-        // group per location => regions server
-        final Map<HRegionLocation, MultiAction<R>> actionsByServer =
-          new HashMap<HRegionLocation, MultiAction<R>>();
-        for (Action<R> aAction : actionsList) {
-          final Row row = aAction.getAction();
-
-          if (row != null) {
-            final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow());
-            if (loc == null) {
-              throw new IOException("No location found, aborting submit.");
-            }
-
-            final byte[] regionName = loc.getRegionInfo().getRegionName();
-            MultiAction<R> actions = actionsByServer.get(loc);
-            if (actions == null) {
-              actions = new MultiAction<R>();
-              actionsByServer.put(loc, actions);
-            }
-            actions.add(regionName, aAction);
-          }
-        }
-
-        // Send the queries and add them to the inProgress list
-        for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
-          long backoffTime = 0;
-          if (isRetry) {
-            if (hci.isUsingServerTrackerForRetries()) {
-              assert this.errorsByServer != null;
-              backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
-            } else {
-              // curNumRetries starts with one, subtract to start from 0.
-              backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
-            }
-          }
-          Callable<MultiResponse> callable =
-            createDelayedCallable(backoffTime, e.getKey(), e.getValue());
-          if (LOG.isTraceEnabled() && isRetry) {
-            StringBuilder sb = new StringBuilder();
-            for (Action<R> action : e.getValue().allActions()) {
-              if (sb.length() > 0) sb.append(' ');
-              sb.append(Bytes.toStringBinary(action.getAction().getRow()));
-            }
-            LOG.trace("Attempt #" + this.curNumRetries + " against " + e.getKey().getHostnamePort()
-              + " after=" + backoffTime + "ms, row(s)=" + sb.toString());
-          }
-          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
-            new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
-              e.getValue(), e.getKey(), this.pool.submit(callable));
-          this.inProgress.addLast(p);
-        }
-      }
-
-     /**
-      * Resubmit the actions which have failed, after a sleep time.
-      * @throws IOException
-      */
-      private void doRetry() throws IOException{
-        submit(this.toReplay, true);
-        this.toReplay.clear();
-      }
-
-      /**
-       * Parameterized batch processing, allowing varying return types for
-       * different {@link Row} implementations.
-       * Throws an exception on error. If there are no exceptions, it means that the 'results'
-       *  array is clean.
-       */
-      private void processBatchCallback() throws IOException, InterruptedException {
-        if (this.results.length != this.rows.size()) {
-          throw new IllegalArgumentException(
-            "argument results (size="+results.length+") must be the same size as " +
-              "argument list (size="+this.rows.size()+")");
-        }
-        if (this.rows.isEmpty()) {
-          return;
-        }
-
-        boolean isTraceEnabled = LOG.isTraceEnabled();
-        BatchErrors errors = new BatchErrors();
-        BatchErrors retriedErrors = null;
-        if (isTraceEnabled) {
-          retriedErrors = new BatchErrors();
-        }
-
-        // We keep the number of retry per action.
-        int[] nbRetries = new int[this.results.length];
-
-        // Build the action list. This list won't change after being created, hence the
-        //  indexes will remain constant, allowing a direct lookup.
-        final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
-        for (int i = 0; i < this.rows.size(); i++) {
-          Action<R> action = new Action<R>(this.rows.get(i), i);
-          listActions.add(action);
-        }
-
-        // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
-        submit(listActions, false);
-
-        // LastRetry is true if, either:
-        //  we had an exception 'DoNotRetry'
-        //  we had more than numRetries for any action
-        //  In this case, we will finish the current retries but we won't start new ones.
-        boolean lastRetry = false;
-        // If hci.numTries is 1 or 0, we do not retry.
-        boolean noRetry = (hci.numTries < 2);
-
-        // Analyze and resubmit until all actions are done successfully or failed after numTries
-        while (!this.inProgress.isEmpty()) {
-          // We need the original multi action to find out what actions to replay if
-          //  we have a 'total' failure of the Future<MultiResponse>
-          // We need the HRegionLocation as we give it back if we go out of retries
-          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
-            removeFirstDone();
-
-          // Get the answer, keep the exception if any as we will use it for the analysis
-          MultiResponse responses = null;
-          ExecutionException exception = null;
-          try {
-            responses = currentTask.getThird().get();
-          } catch (ExecutionException e) {
-            exception = e;
-          }
-          HRegionLocation location = currentTask.getSecond();
-          // Error case: no result at all for this multi action. We need to redo all actions
-          if (responses == null) {
-            for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
-              for (Action<R> action : actions) {
-                Row row = action.getAction();
-                // Do not use the exception for updating cache because it might be coming from
-                // any of the regions in the MultiAction.
-                hci.updateCachedLocations(tableName, row, null, location);
-                if (noRetry) {
-                  errors.add(exception, row, location);
-                } else {
-                  if (isTraceEnabled) {
-                    retriedErrors.add(exception, row, location);
-                  }
-                  lastRetry = addToReplay(nbRetries, action, location);
-                }
-              }
-            }
-          } else { // Success or partial success
-            // Analyze detailed results. We can still have individual failures to be redo.
-            // two specific exceptions are managed:
-            //  - DoNotRetryIOException: we continue to retry for other actions
-            //  - RegionMovedException: we update the cache with the new region location
-            for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
-                responses.getResults().entrySet()) {
-              for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
-                Action<R> correspondingAction = listActions.get(regionResult.getFirst());
-                Object result = regionResult.getSecond();
-                this.results[correspondingAction.getOriginalIndex()] = result;
-
-                // Failure: retry if it's make sense else update the errors lists
-                if (result == null || result instanceof Throwable) {
-                  Row row = correspondingAction.getAction();
-                  hci.updateCachedLocations(this.tableName, row, result, location);
-                  if (result instanceof DoNotRetryIOException || noRetry) {
-                    errors.add((Exception)result, row, location);
-                  } else {
-                    if (isTraceEnabled) {
-                      retriedErrors.add((Exception)result, row, location);
-                    }
-                    lastRetry = addToReplay(nbRetries, correspondingAction, location);
-                  }
-                } else // success
-                  if (callback != null) {
-                    this.callback.update(resultsForRS.getKey(),
-                      this.rows.get(regionResult.getFirst()).getRow(), (R) result);
-                }
-              }
-            }
-          }
-
-          // Retry all actions in toReplay then clear it.
-          if (!noRetry && !toReplay.isEmpty()) {
-            if (isTraceEnabled) {
-              LOG.trace("Retrying #" + this.curNumRetries +
-                (lastRetry ? " (one last time)": "") + " because " +
-                retriedErrors.getDescriptionAndClear());
-            }
-            doRetry();
-            if (lastRetry) {
-              noRetry = true;
-            }
-          }
-        }
-
-        errors.rethrowIfAny();
       }
 
-
-      private class BatchErrors {
-        private List<Throwable> exceptions = new ArrayList<Throwable>();
-        private List<Row> actions = new ArrayList<Row>();
-        private List<String> addresses = new ArrayList<String>();
-
-        public void add(Exception ex, Row row, HRegionLocation location) {
-          exceptions.add(ex);
-          actions.add(row);
-          addresses.add(location.getHostnamePort());
-        }
-
-        public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
-          if (!exceptions.isEmpty()) {
-            throw makeException();
-          }
-        }
-
-        public String getDescriptionAndClear(){
-          if (exceptions.isEmpty()) {
-            return "";
-          }
-          String result = makeException().getExhaustiveDescription();
-          exceptions.clear();
-          actions.clear();
-          addresses.clear();
-          return result;
-        }
-
-        private RetriesExhaustedWithDetailsException makeException() {
-          return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
-        }
-      }
-
-      /**
-       * Put the action that has to be retried in the Replay list.
-       * @return true if we're out of numRetries and it's the last retry.
-       */
-      private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) {
-        this.toReplay.add(action);
-        nbRetries[action.getOriginalIndex()]++;
-        if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
-          this.curNumRetries = nbRetries[action.getOriginalIndex()];
-        }
-        if (hci.isUsingServerTrackerForRetries()) {
-          if (this.errorsByServer == null) {
-            this.errorsByServer = hci.createServerErrorTracker();
-          }
-          this.errorsByServer.reportServerError(source);
-          return !this.errorsByServer.canRetryMore();
-        } else {
-          // We need to add 1 to make tries and retries comparable. And as we look for
-          // the last try we compare with '>=' and not '>'. And we need curNumRetries
-          // to means what it says as we don't want to initialize it to 1.
-          return ((this.curNumRetries + 1) >= hci.numTries);
+      @Override
+      public void success(int pos, byte[] region, Row row, Res result) {
+        assert pos < results.length;
+        results[pos] = result;
+        if (callback != null) {
+          callback.update(region, row.getRow(), result);
         }
       }
 
-      /**
-       * Wait for one of tasks to be done, and remove it from the list.
-       * @return the tasks done.
-       */
-      private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
-      removeFirstDone() throws InterruptedException {
-        while (true) {
-          synchronized (finishedTasks) {
-            if (!finishedTasks.isEmpty()) {
-              MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
-
-              // We now need to remove it from the inProgress part.
-              Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
-                inProgress.iterator();
-              while (it.hasNext()) {
-                Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
-                if (task.getFirst() == done) { // We have the exact object. No java equals here.
-                  it.remove();
-                  return task;
-                }
-              }
-              LOG.error("Development error: We didn't see a task in the list. " +
-                done.getRegions());
-            }
-            finishedTasks.wait(10);
-          }
-        }
+      @Override
+      public boolean failure(int pos, byte[] region, Row row, Throwable t) {
+        assert pos < results.length;
+        results[pos] = t;
+        //Batch.Callback<Res> was not called on failure in 0.94. We keep this.
+        return true; // we want to have this failure in the failures list.
       }
 
-      private Callable<MultiResponse> createDelayedCallable(
-        final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
-
-        final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
-
-        return new Callable<MultiResponse>() {
-          private final long creationTime = System.currentTimeMillis();
-
-          @Override
-          public MultiResponse call() throws Exception {
-            try {
-              final long waitingTime = delay + creationTime - System.currentTimeMillis();
-              if (waitingTime > 0) {
-                Thread.sleep(waitingTime);
-              }
-              return delegate.call();
-            } finally {
-              synchronized (finishedTasks) {
-                finishedTasks.add(multi);
-                finishedTasks.notifyAll();
-              }
-            }
-          }
-        };
+      @Override
+      public boolean retriableFailure(int originalIndex, Row row, byte[] region,
+                                      Throwable exception) {
+        return true; // we retry
       }
     }
 
@@ -2701,102 +2365,89 @@ public class HConnectionManager {
       }
       throw new TableNotFoundException(Bytes.toString(tableName));
     }
+  }
 
-    /**
-     * The record of errors for servers. Visible for testing.
-     */
-    @VisibleForTesting
-    static class ServerErrorTracker {
-      private final Map<HRegionLocation, ServerErrors> errorsByServer =
-          new HashMap<HRegionLocation, ServerErrors>();
-      private long canRetryUntil = 0;
+  /**
+   * The record of errors for servers.
+   */
+  static class ServerErrorTracker {
+    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
+    private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
+        new ConcurrentHashMap<HRegionLocation, ServerErrors>();
+    private long canRetryUntil = 0;
 
-      public ServerErrorTracker(long timeout) {
-        LOG.trace("Server tracker timeout is " + timeout + "ms");
-        this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
-      }
+    public ServerErrorTracker(long timeout) {
+      LOG.trace("Server tracker timeout is " + timeout + "ms");
+      this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+    }
 
-      boolean canRetryMore() {
-        return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
-      }
+    boolean canRetryMore() {
+      return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+    }
 
-      /**
-       * Calculates the back-off time for a retrying request to a particular server.
-       * This is here, and package private, for testing (no good way to get at it).
-       * @param server The server in question.
-       * @param basePause The default hci pause.
-       * @return The time to wait before sending next request.
-       */
-      long calculateBackoffTime(HRegionLocation server, long basePause) {
-        long result = 0;
-        ServerErrors errorStats = errorsByServer.get(server);
-        if (errorStats != null) {
-          result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
-          // Adjust by the time we already waited since last talking to this server.
-          long now = EnvironmentEdgeManager.currentTimeMillis();
-          long timeSinceLastError = now - errorStats.getLastErrorTime();
-          if (timeSinceLastError > 0) {
-            result = Math.max(0, result - timeSinceLastError);
-          }
-          // Finally, see if the backoff time overshoots the timeout.
-          if (result > 0 && (now + result > this.canRetryUntil)) {
-            result = Math.max(0, this.canRetryUntil - now);
-          }
+    /**
+     * Calculates the back-off time for a retrying request to a particular server.
+     *
+     * @param server    The server in question.
+     * @param basePause The default hci pause.
+     * @return The time to wait before sending next request.
+     */
+    long calculateBackoffTime(HRegionLocation server, long basePause) {
+      long result = 0;
+      ServerErrors errorStats = errorsByServer.get(server);
+      if (errorStats != null) {
+        result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
+        // Adjust by the time we already waited since last talking to this server.
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        long timeSinceLastError = now - errorStats.getLastErrorTime();
+        if (timeSinceLastError > 0) {
+          result = Math.max(0, result - timeSinceLastError);
+        }
+        // Finally, see if the backoff time overshoots the timeout.
+        if (result > 0 && (now + result > this.canRetryUntil)) {
+          result = Math.max(0, this.canRetryUntil - now);
         }
-        return result;
       }
+      return result;
+    }
 
-      /**
-       * Reports that there was an error on the server to do whatever bean-counting necessary.
-       * This is here, and package private, for testing (no good way to get at it).
-       * @param server The server in question.
-       */
-      void reportServerError(HRegionLocation server) {
-        ServerErrors errors = errorsByServer.get(server);
-        if (errors != null) {
-          errors.addError();
-        } else {
-          errorsByServer.put(server, new ServerErrors());
-        }
+    /**
+     * Reports that there was an error on the server to do whatever bean-counting necessary.
+     *
+     * @param server The server in question.
+     */
+    void reportServerError(HRegionLocation server) {
+      ServerErrors errors = errorsByServer.get(server);
+      if (errors != null) {
+        errors.addError();
+      } else {
+        errorsByServer.put(server, new ServerErrors());
       }
+    }
 
-      /**
-       * The record of errors for a server.
-       */
-      private static class ServerErrors {
-        public long lastErrorTime;
-        public int retries;
-
-        public ServerErrors() {
-          this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
-          this.retries = 0;
-        }
+    /**
+     * The record of errors for a server.
+     */
+    private static class ServerErrors {
+      public long lastErrorTime;
+      public int retries;
 
-        public void addError() {
-          this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
-          ++this.retries;
-        }
+      public ServerErrors() {
+        this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.retries = 0;
+      }
 
-        public long getLastErrorTime() {
-          return this.lastErrorTime;
-        }
+      public void addError() {
+        this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+        ++this.retries;
       }
-    }
 
-    public boolean isUsingServerTrackerForRetries() {
-      return this.useServerTrackerForRetries;
-    }
-    /**
-     * Creates the server error tracker to use inside process.
-     * Currently, to preserve the main assumption about current retries, and to work well with
-     * the retry-limit-based calculation, the calculation is local per Process object.
-     * We may benefit from connection-wide tracking of server errors.
-     * @return ServerErrorTracker to use.
-     */
-    ServerErrorTracker createServerErrorTracker() {
-      return new ServerErrorTracker(this.serverTrackerTimeout);
+      public long getLastErrorTime() {
+        return this.lastErrorTime;
+      }
     }
   }
+
   /**
    * Set the number of retries to use serverside when trying to communicate
    * with another server over {@link HConnection}.  Used updating catalog

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Jun 24 18:57:29 2013
@@ -59,6 +59,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -116,14 +117,14 @@ import java.util.concurrent.TimeUnit;
 @InterfaceStability.Stable
 public class HTable implements HTableInterface {
   private static final Log LOG = LogFactory.getLog(HTable.class);
-  private HConnection connection;
+  protected HConnection connection;
   private final byte [] tableName;
   private volatile Configuration configuration;
-  private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
+  protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
   private long writeBufferSize;
   private boolean clearBufferOnFail;
   private boolean autoFlush;
-  private long currentWriteBufferSize;
+  protected long currentWriteBufferSize;
   protected int scannerCaching;
   private int maxKeyValueSize;
   private ExecutorService pool;  // For Multi
@@ -132,6 +133,9 @@ public class HTable implements HTableInt
   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
   private final boolean cleanupConnectionOnClose; // close the connection in close()
 
+  /** The Async process for puts with autoflush set to false or multiputs */
+  protected AsyncProcess<Object> ap;
+
   /**
    * Creates an object to access a HBase table.
    * Shares zookeeper connection and other resources with other HTable instances
@@ -239,6 +243,15 @@ public class HTable implements HTableInt
   }
 
   /**
+   * For internal testing.
+   */
+  protected HTable(){
+    tableName = new byte[]{};
+    cleanupPoolOnClose = false;
+    cleanupConnectionOnClose = false;
+  }
+
+  /**
    * setup this HTable's parameter based on the passed configuration
    */
   private void finishSetup() throws IOException {
@@ -257,11 +270,15 @@ public class HTable implements HTableInt
         HConstants.HBASE_CLIENT_SCANNER_CACHING,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
 
+    ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration);
+
     this.maxKeyValueSize = this.configuration.getInt(
         "hbase.client.keyvalue.maxsize", -1);
     this.closed = false;
   }
 
+
+
   /**
    * {@inheritDoc}
    */
@@ -397,6 +414,15 @@ public class HTable implements HTableInt
   }
 
   /**
+   * Kept in 0.96 for backward compatibility
+   * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
+   */
+  @Deprecated
+  public List<Row> getWriteBuffer() {
+    return writeAsyncBuffer;
+  }
+
+  /**
    * Sets the number of rows that a scanner will fetch at once.
    * <p>
    * This will override the value specified by
@@ -647,21 +673,19 @@ public class HTable implements HTableInt
   @Override
   public void batch(final List<?extends Row> actions, final Object[] results)
       throws InterruptedException, IOException {
-    connection.processBatchCallback(actions, tableName, pool, results, null);
+    batchCallback(actions, results, null);
   }
 
   @Override
   public Object[] batch(final List<? extends Row> actions)
      throws InterruptedException, IOException {
-    Object[] results = new Object[actions.size()];
-    connection.processBatchCallback(actions, tableName, pool, results, null);
-    return results;
+    return batchCallback(actions, null);
   }
 
   @Override
   public <R> void batchCallback(
-    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
-    throws IOException, InterruptedException {
+      final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+      throws IOException, InterruptedException {
     connection.processBatchCallback(actions, tableName, pool, results, callback);
   }
 
@@ -670,7 +694,7 @@ public class HTable implements HTableInt
     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
       InterruptedException {
     Object[] results = new Object[actions.size()];
-    connection.processBatchCallback(actions, tableName, pool, results, callback);
+    batchCallback(actions, results, callback);
     return results;
   }
 
@@ -702,7 +726,7 @@ public class HTable implements HTableInt
   throws IOException {
     Object[] results = new Object[deletes.size()];
     try {
-      connection.processBatch((List) deletes, tableName, pool, results);
+      batch(deletes, results);
     } catch (InterruptedException e) {
       throw new IOException(e);
     } finally {
@@ -722,7 +746,8 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
-  public void put(final Put put) throws IOException {
+  public void put(final Put put)
+      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
     doPut(put);
     if (autoFlush) {
       flushCommits();
@@ -733,7 +758,8 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
-  public void put(final List<Put> puts) throws IOException {
+  public void put(final List<Put> puts)
+      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
     for (Put put : puts) {
       doPut(put);
     }
@@ -742,12 +768,64 @@ public class HTable implements HTableInt
     }
   }
 
-  private void doPut(Put put) throws IOException{
+
+  /**
+   * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
+   *  cluster.
+   * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
+   * @throws InterruptedIOException if we were interrupted.
+   */
+  private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+    if (ap.hasError()){
+      backgroundFlushCommits(true);
+    }
+
     validatePut(put);
-    writeBuffer.add(put);
+
     currentWriteBufferSize += put.heapSize();
-    if (currentWriteBufferSize > writeBufferSize) {
-      flushCommits();
+    writeAsyncBuffer.add(put);
+
+    while (currentWriteBufferSize > writeBufferSize) {
+      backgroundFlushCommits(false);
+    }
+  }
+
+
+  /**
+   * Send the operations in the buffer to the servers. Does not wait for the server's answer.
+   * If the is an error (max retried reach from a previous flush or bad operation), it tries to
+   * send all operations in the buffer and sends an exception.
+   */
+  private void backgroundFlushCommits(boolean synchronous) throws
+      InterruptedIOException, RetriesExhaustedWithDetailsException {
+
+    try {
+      // If there is an error on the operations in progress, we don't add new operations.
+      if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
+        ap.submit(writeAsyncBuffer, true);
+      }
+
+      if (synchronous || ap.hasError()) {
+        ap.waitUntilDone();
+      }
+
+      if (ap.hasError()) {
+        if (!clearBufferOnFail) {
+          // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
+          //  write buffer. This is a questionable feature kept here for backward compatibility
+          writeAsyncBuffer.addAll(ap.getFailedOperations());
+        }
+        RetriesExhaustedWithDetailsException e = ap.getErrors();
+        ap.clearErrors();
+        throw e;
+      }
+    } finally {
+      currentWriteBufferSize = 0;
+      for (Row mut : writeAsyncBuffer) {
+        if (mut instanceof Mutation) {
+          currentWriteBufferSize += ((Mutation) mut).heapSize();
+        }
+      }
     }
   }
 
@@ -1080,37 +1158,13 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
-  public void flushCommits() throws IOException {
-    if (writeBuffer.isEmpty()){
-      // Early exit: we can be called on empty buffers.
-      return;
-    }
-
-    Object[] results = new Object[writeBuffer.size()];
-    boolean success = false;
-    try {
-      this.connection.processBatch(writeBuffer, tableName, pool, results);
-      success = true;
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException(e.getMessage());
-    } finally {
-      // mutate list so that it is empty for complete success, or contains
-      // only failed records. Results are returned in the same order as the
-      // requests in list. Walk the list backwards, so we can remove from list
-      // without impacting the indexes of earlier members
-      currentWriteBufferSize = 0;
-      if (success || clearBufferOnFail) {
-        writeBuffer.clear();
-      } else {
-        for (int i = results.length - 1; i >= 0; i--) {
-          if (results[i] instanceof Result) {
-            writeBuffer.remove(i);
-          } else {
-            currentWriteBufferSize += writeBuffer.get(i).heapSize();
-          }
-        }
-      }
-    }
+  public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+    // We're looping, as if one region is overloaded we keep its operations in the buffer.
+    // As we can have an operation in progress even if the buffer is empty, we call
+    //  backgroundFlushCommits at least one time.
+    do {
+      backgroundFlushCommits(true);
+    } while (!writeAsyncBuffer.isEmpty());
   }
 
   /**
@@ -1127,7 +1181,7 @@ public class HTable implements HTableInt
   public <R> void processBatchCallback(
     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
     throws IOException, InterruptedException {
-    connection.processBatchCallback(list, tableName, pool, results, callback);
+    this.batchCallback(list, results, callback);
   }
 
 
@@ -1254,14 +1308,6 @@ public class HTable implements HTableInt
   }
 
   /**
-   * Returns the write buffer.
-   * @return The current write buffer.
-   */
-  public ArrayList<Put> getWriteBuffer() {
-    return writeBuffer;
-  }
-
-  /**
    * The pool is used for mutli requests for this HTable
    * @return the pool used for mutli
    */

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java Mon Jun 24 18:57:29 2013
@@ -48,11 +48,11 @@ import java.util.Set;
 public class RetriesExhaustedWithDetailsException
 extends RetriesExhaustedException {
   List<Throwable> exceptions;
-  List<Row> actions;
+  List<? extends Row> actions;
   List<String> hostnameAndPort;
 
   public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
-                                              List<Row> actions,
+                                              List<? extends Row> actions,
                                               List<String> hostnameAndPort) {
     super("Failed " + exceptions.size() + " action" +
         pluralize(exceptions) + ": " +
@@ -105,7 +105,7 @@ extends RetriesExhaustedException {
   }
 
   public static String getDesc(List<Throwable> exceptions,
-                               List<Row> actions,
+                               List<? extends Row> actions,
                                List<String> hostnamePort) {
     String s = getDesc(classifyExs(exceptions));
     StringBuilder addrs = new StringBuilder(s);

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1496159&r1=1496158&r2=1496159&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Mon Jun 24 18:57:29 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -187,6 +188,8 @@ public abstract class ServerCallable<T> 
           // map to that slow/dead server; otherwise, let cache miss and ask
           // .META. again to find the new location
           getConnection().clearCaches(location.getServerName());
+        } else if (t instanceof RegionMovedException) {
+          getConnection().updateCachedLocations(tableName, row, t, location);
         } else if (t instanceof NotServingRegionException && numRetries == 1) {
           // Purge cache entries for this specific region from META cache
           // since we don't call connect(true) when number of retries is 1.