You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/24 04:10:47 UTC

[3/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable

HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cb55c40
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cb55c40
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cb55c40

Branch: refs/heads/master
Commit: 8cb55c4080206a651023f6d042fac295192f1c2b
Parents: 992e571
Author: ChiaPing Tsai <ch...@gmail.com>
Authored: Sat Dec 24 12:02:05 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Dec 24 12:02:05 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 704 +++--------------
 .../hadoop/hbase/client/AsyncProcessTask.java   | 229 ++++++
 .../hbase/client/AsyncRequestFutureImpl.java    |  46 +-
 .../hbase/client/BufferedMutatorImpl.java       | 165 ++--
 .../hbase/client/BufferedMutatorParams.java     |  21 +-
 .../hbase/client/ConnectionConfiguration.java   |  19 +-
 .../hbase/client/ConnectionImplementation.java  |  13 +-
 .../org/apache/hadoop/hbase/client/HTable.java  | 295 ++++---
 .../hadoop/hbase/client/HTableMultiplexer.java  |  15 +-
 .../hadoop/hbase/client/RequestController.java  | 125 +++
 .../hbase/client/RequestControllerFactory.java  |  44 ++
 .../apache/hadoop/hbase/client/RowAccess.java   |   3 +-
 .../hbase/client/SimpleRequestController.java   | 519 +++++++++++++
 .../hadoop/hbase/client/TestAsyncProcess.java   | 769 ++++++++-----------
 .../client/TestSimpleRequestController.java     | 336 ++++++++
 .../hbase/client/HConnectionTestingUtility.java |   7 +-
 .../hadoop/hbase/client/TestClientPushback.java |  19 +-
 .../hadoop/hbase/client/TestReplicasClient.java |  14 +-
 .../regionserver/TestPerColumnFamilyFlush.java  |   1 -
 .../security/access/TestTablePermissions.java   |  72 +-
 20 files changed, 2128 insertions(+), 1288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 50a2a11..d1583f5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -19,45 +19,35 @@
 
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
+import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdge;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * This class  allows a continuous flow of requests. It's written to be compatible with a
@@ -95,9 +85,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * </p>
  */
 @InterfaceAudience.Private
+@InterfaceStability.Evolving
 class AsyncProcess {
   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
-  protected static final AtomicLong COUNTER = new AtomicLong();
+  private static final AtomicLong COUNTER = new AtomicLong();
 
   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
 
@@ -116,31 +107,6 @@ class AsyncProcess {
    */
   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
 
-  protected final int thresholdToLogUndoneTaskDetails;
-  private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
-      "hbase.client.threshold.log.details";
-  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
-  private static final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
-
-  /**
-   * The maximum size of single RegionServer.
-   */
-  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
-
-  /**
-   * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
-   */
-  public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
-
-  /**
-   * The maximum size of submit.
-   */
-  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
-  /**
-   * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
-   */
-  public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
-
   /**
    * Return value from a submit that didn't contain any requests.
    */
@@ -173,64 +139,42 @@ class AsyncProcess {
   };
 
   // TODO: many of the fields should be made private
-  protected final long id;
-
-  protected final ClusterConnection connection;
-  protected final RpcRetryingCallerFactory rpcCallerFactory;
-  protected final RpcControllerFactory rpcFactory;
-  protected final BatchErrors globalErrors;
-  protected final ExecutorService pool;
-
-  protected final AtomicLong tasksInProgress = new AtomicLong(0);
-  protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
-      new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
-  protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
-      new ConcurrentHashMap<ServerName, AtomicInteger>();
-  // Start configuration settings.
-  protected final int startLogErrorsCnt;
+  final long id;
 
-  /**
-   * The number of tasks simultaneously executed on the cluster.
-   */
-  protected final int maxTotalConcurrentTasks;
+  final ClusterConnection connection;
+  private final RpcRetryingCallerFactory rpcCallerFactory;
+  final RpcControllerFactory rpcFactory;
+  final BatchErrors globalErrors;
 
-  /**
-   * The max heap size of all tasks simultaneously executed on a server.
-   */
-  protected final long maxHeapSizePerRequest;
-  protected final long maxHeapSizeSubmit;
-  /**
-   * The number of tasks we run in parallel on a single region.
-   * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
-   * a set of operations on a region before the previous one is done. As well, this limits
-   * the pressure we put on the region server.
-   */
-  protected final int maxConcurrentTasksPerRegion;
+  // Start configuration settings.
+  final int startLogErrorsCnt;
 
-  /**
-   * The number of task simultaneously executed on a single region server.
-   */
-  protected final int maxConcurrentTasksPerServer;
-  protected final long pause;
-  protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
-  protected int numTries;
-  protected int serverTrackerTimeout;
-  protected int rpcTimeout;
-  protected int operationTimeout;
-  protected long primaryCallTimeoutMicroseconds;
+  final long pause;
+  final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
+  final int numTries;
+  @VisibleForTesting
+  int serverTrackerTimeout;
+  final long primaryCallTimeoutMicroseconds;
   /** Whether to log details for batch errors */
-  protected final boolean logBatchErrorDetails;
+  final boolean logBatchErrorDetails;
   // End configuration settings.
 
-  public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
+  /**
+   * The traffic control for requests.
+   */
+  @VisibleForTesting
+  final RequestController requestController;
+  public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
+  private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
+  private final int periodToLog;
+  AsyncProcess(ClusterConnection hc, Configuration conf,
       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
-      RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
+      RpcControllerFactory rpcFactory) {
     if (hc == null) {
       throw new IllegalArgumentException("ClusterConnection cannot be null.");
     }
 
     this.connection = hc;
-    this.pool = pool;
     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
 
     this.id = COUNTER.incrementAndGet();
@@ -249,42 +193,10 @@ class AsyncProcess {
     // how many times we could try in total, one more than retry number
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
-    this.rpcTimeout = rpcTimeout;
-    this.operationTimeout = operationTimeout;
     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
-
-    this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-      HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
-    this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
-    this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
-    this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
-          DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
-    this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
     this.startLogErrorsCnt =
         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
-
-    if (this.maxTotalConcurrentTasks <= 0) {
-      throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
-    }
-    if (this.maxConcurrentTasksPerServer <= 0) {
-      throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
-          maxConcurrentTasksPerServer);
-    }
-    if (this.maxConcurrentTasksPerRegion <= 0) {
-      throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
-          maxConcurrentTasksPerRegion);
-    }
-    if (this.maxHeapSizePerRequest <= 0) {
-      throw new IllegalArgumentException("maxHeapSizePerServer=" +
-          maxHeapSizePerRequest);
-    }
-
-    if (this.maxHeapSizeSubmit <= 0) {
-      throw new IllegalArgumentException("maxHeapSizeSubmit=" +
-          maxHeapSizeSubmit);
-    }
+    this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
     // However, if we are too useful, we might fail very quickly due to retry count limit.
     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
@@ -301,43 +213,30 @@ class AsyncProcess {
     this.rpcFactory = rpcFactory;
     this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
 
-    this.thresholdToLogUndoneTaskDetails =
-        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
-          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+    this.requestController = RequestControllerFactory.create(conf);
   }
 
   /**
-   * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
-   *         RuntimeException
+   * The submitted task may be not accomplished at all if there are too many running tasks or
+   * other limits.
+   * @param <CResult> The class to cast the result
+   * @param task The setting and data
+   * @return AsyncRequestFuture
    */
-  protected ExecutorService getPool(ExecutorService pool) {
-    if (pool != null) {
-      return pool;
-    }
-    if (this.pool != null) {
-      return this.pool;
+  public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
+    AsyncRequestFuture reqFuture = checkTask(task);
+    if (reqFuture != null) {
+      return reqFuture;
+    }
+    SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
+    switch (submittedRows) {
+      case ALL:
+        return submitAll(task);
+      case AT_LEAST_ONE:
+        return submit(task, true);
+      default:
+        return submit(task, false);
     }
-    throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
-  }
-
-  /**
-   * See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean).
-   * Uses default ExecutorService for this AP (must have been created with one).
-   */
-  public <CResult> AsyncRequestFuture submit(TableName tableName,
-      final RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
-      boolean needResults) throws InterruptedIOException {
-    return submit(null, tableName, rows, atLeastOne, callback, needResults);
-  }
-  /**
-   * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
-   * Uses the {@link ListRowAccess} to wrap the {@link List}.
-   */
-  public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
-      List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
-      boolean needResults) throws InterruptedIOException {
-    return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
-      callback, needResults);
   }
 
   /**
@@ -345,20 +244,13 @@ class AsyncProcess {
    * list. Does not send requests to replicas (not currently used for anything other
    * than streaming puts anyway).
    *
-   * @param pool ExecutorService to use.
-   * @param tableName The table for which this request is needed.
-   * @param callback Batch callback. Only called on success (94 behavior).
-   * @param needResults Whether results are needed, or can be discarded.
-   * @param rows - the submitted row. Modified by the method: we remove the rows we took.
+   * @param task The setting and data
    * @param atLeastOne true if we should submit at least a subset.
    */
-  public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
-      RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
-      boolean needResults) throws InterruptedIOException {
-    if (rows.isEmpty()) {
-      return NO_REQS_RESULT;
-    }
-
+  private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
+    boolean atLeastOne) throws InterruptedIOException {
+    TableName tableName = task.getTableName();
+    RowAccess<? extends Row> rows = task.getRowAccess();
     Map<ServerName, MultiAction> actionsByServer =
         new HashMap<ServerName, MultiAction>();
     List<Action> retainedActions = new ArrayList<Action>(rows.size());
@@ -369,11 +261,11 @@ class AsyncProcess {
     // Location errors that happen before we decide what requests to take.
     List<Exception> locationErrors = null;
     List<Integer> locationErrorRows = null;
-    RowCheckerHost checker = createRowCheckerHost();
+    RequestController.Checker checker = requestController.newChecker();
     boolean firstIter = true;
     do {
       // Wait until there is at least one slot for a new task.
-      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
+      requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
       int posInList = -1;
       if (!firstIter) {
         checker.reset();
@@ -406,8 +298,7 @@ class AsyncProcess {
           it.remove();
           break; // Backward compat: we stop considering actions on location error.
         }
-        long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0;
-        ReturnCode code = checker.canTakeOperation(loc, rowSize);
+        ReturnCode code = checker.canTakeRow(loc, r);
         if (code == ReturnCode.END) {
           break;
         }
@@ -426,29 +317,14 @@ class AsyncProcess {
 
     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
 
-    return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
-        locationErrors, locationErrorRows, actionsByServer, pool);
+    return submitMultiActions(task, retainedActions, nonceGroup,
+        locationErrors, locationErrorRows, actionsByServer);
   }
 
-  private RowCheckerHost createRowCheckerHost() {
-    return new RowCheckerHost(Arrays.asList(
-        new TaskCountChecker(maxTotalConcurrentTasks,
-          maxConcurrentTasksPerServer,
-          maxConcurrentTasksPerRegion,
-          tasksInProgress,
-          taskCounterPerServer,
-          taskCounterPerRegion)
-        , new RequestSizeChecker(maxHeapSizePerRequest)
-        , new SubmittedSizeChecker(maxHeapSizeSubmit)
-    ));
-  }
-  <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
-      List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
-      Object[] results, boolean needResults, List<Exception> locationErrors,
-      List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer,
-      ExecutorService pool) {
-    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-      tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
+  <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
+      List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
+      List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
+    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
     // Add location errors if any
     if (locationErrors != null) {
       for (int i = 0; i < locationErrors.size(); ++i) {
@@ -462,14 +338,6 @@ class AsyncProcess {
     return ars;
   }
 
-  public void setRpcTimeout(int rpcTimeout) {
-    this.rpcTimeout = rpcTimeout;
-  }
-
-  public void setOperationTimeout(int operationTimeout) {
-    this.operationTimeout = operationTimeout;
-  }
-
   /**
    * Helper that is used when grouping the actions per region server.
    *
@@ -493,24 +361,13 @@ class AsyncProcess {
     multiAction.add(regionName, action);
   }
 
-  public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
-      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
-    return submitAll(pool, tableName, rows, callback, results, null, -1);
-  }
   /**
    * Submit immediately the list of rows, whatever the server status. Kept for backward
    * compatibility: it allows to be used with the batch interface that return an array of objects.
-   *
-   * @param pool ExecutorService to use.
-   * @param tableName name of the table for which the submission is made.
-   * @param rows the list of rows.
-   * @param callback the callback.
-   * @param results Optional array to return the results thru; backward compat.
-   * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
+   * @param task The setting and data
    */
-  public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
-      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
-      CancellableRegionServerCallable callable, int rpcTimeout) {
+  private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
+    RowAccess<? extends Row> rows = task.getRowAccess();
     List<Action> actions = new ArrayList<Action>(rows.size());
 
     // The position will be used by the processBatch to match the object array returned.
@@ -528,93 +385,78 @@ class AsyncProcess {
       setNonce(ng, r, action);
       actions.add(action);
     }
-    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-        tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
-        callable, rpcTimeout);
+    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
     ars.groupAndSendMultiAction(actions, 1);
     return ars;
   }
 
+  private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
+    if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
+      return NO_REQS_RESULT;
+    }
+    Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
+    checkOperationTimeout(task.getOperationTimeout());
+    checkRpcTimeout(task.getRpcTimeout());
+    return null;
+  }
+
   private void setNonce(NonceGenerator ng, Row r, Action action) {
     if (!(r instanceof Append) && !(r instanceof Increment)) return;
     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
   }
 
-  protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
-      TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool,
-      Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      CancellableRegionServerCallable callable, int rpcTimeout) {
-    return new AsyncRequestFutureImpl<CResult>(
-        tableName, actions, nonceGroup, getPool(pool), needResults,
-        results, callback, callable, operationTimeout,
-        rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
+  private int checkTimeout(String name, int timeout) {
+    if (timeout < 0) {
+      throw new RuntimeException("The " + name + " must be bigger than zero,"
+        + "current value is" + timeout);
+    }
+    return timeout;
+  }
+  private int checkOperationTimeout(int operationTimeout) {
+    return checkTimeout("operation timeout", operationTimeout);
+  }
+
+  private int checkRpcTimeout(int rpcTimeout) {
+    return checkTimeout("rpc timeout", rpcTimeout);
+  }
+
+  @VisibleForTesting
+  <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
+    return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
   }
 
   /** Wait until the async does not have more than max tasks in progress. */
-  protected void waitForMaximumCurrentTasks(int max, String tableName)
+  protected void waitForMaximumCurrentTasks(int max, TableName tableName)
       throws InterruptedIOException {
-    waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
+    requestController.waitForMaximumCurrentTasks(max, id, periodToLog,
+      getLogger(tableName, max));
   }
 
-  // Break out this method so testable
-  @VisibleForTesting
-  void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
-      String tableName) throws InterruptedIOException {
-    long lastLog = EnvironmentEdgeManager.currentTime();
-    long currentInProgress, oldInProgress = Long.MAX_VALUE;
-    while ((currentInProgress = tasksInProgress.get()) > max) {
-      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
-        long now = EnvironmentEdgeManager.currentTime();
-        if (now > lastLog + 10000) {
-          lastLog = now;
-          LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
-              + max + ", tasksInProgress=" + currentInProgress +
-              " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
-          if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
-            logDetailsOfUndoneTasks(currentInProgress);
-          }
-        }
-      }
-      oldInProgress = currentInProgress;
-      try {
-        synchronized (tasksInProgress) {
-          if (tasksInProgress.get() == oldInProgress) {
-            tasksInProgress.wait(10);
-          }
-        }
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("#" + id + ", interrupted." +
-            " currentNumberOfTask=" + currentInProgress);
-      }
-    }
+  private Consumer<Long> getLogger(TableName tableName, long max) {
+    return (currentInProgress) -> {
+      LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
+      : ", waiting for some tasks to finish. Expected max="
+      + max) + ", tasksInProgress=" + currentInProgress +
+      " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
+    };
   }
 
-  void logDetailsOfUndoneTasks(long taskInProgress) {
-    ArrayList<ServerName> servers = new ArrayList<ServerName>();
-    for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
-      if (entry.getValue().get() > 0) {
-        servers.add(entry.getKey());
-      }
-    }
-    LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
-    if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
-      ArrayList<String> regions = new ArrayList<String>();
-      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
-        if (entry.getValue().get() > 0) {
-          regions.add(Bytes.toString(entry.getKey()));
-        }
-      }
-      LOG.info("Regions against which left over task(s) are processed: " + regions);
-    }
+  void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
+    requestController.incTaskCounters(regions, sn);
   }
 
+
+  void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
+    requestController.decTaskCounters(regions, sn);
+  }
   /**
    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
    * @return Whether there were any errors in any request since the last time
    *          {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created.
    */
   public boolean hasError() {
-    return globalErrors.hasErrors();
+    return globalErrors != null && globalErrors.hasErrors();
   }
 
   /**
@@ -628,9 +470,9 @@ class AsyncProcess {
    *          was called, or AP was created.
    */
   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
-      List<Row> failedRows, String tableName) throws InterruptedIOException {
+      List<Row> failedRows, TableName tableName) throws InterruptedIOException {
     waitForMaximumCurrentTasks(0, tableName);
-    if (!globalErrors.hasErrors()) {
+    if (globalErrors == null || !globalErrors.hasErrors()) {
       return null;
     }
     if (failedRows != null) {
@@ -642,41 +484,12 @@ class AsyncProcess {
   }
 
   /**
-   * increment the tasks counters for a given set of regions. MT safe.
-   */
-  protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
-    tasksInProgress.incrementAndGet();
-
-    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
-
-    for (byte[] regBytes : regions) {
-      computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet();
-    }
-  }
-
-  /**
-   * Decrements the counters for a given region and the region server. MT Safe.
-   */
-  protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
-    for (byte[] regBytes : regions) {
-      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
-      regionCnt.decrementAndGet();
-    }
-
-    taskCounterPerServer.get(sn).decrementAndGet();
-    tasksInProgress.decrementAndGet();
-    synchronized (tasksInProgress) {
-      tasksInProgress.notifyAll();
-    }
-  }
-
-  /**
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
   protected RpcRetryingCaller<AbstractResponse> createCaller(
       CancellableRegionServerCallable callable, int rpcTimeout) {
-    return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
+    return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
   }
 
 
@@ -687,7 +500,7 @@ class AsyncProcess {
    * We may benefit from connection-wide tracking of server errors.
    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
    */
-  protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
+  ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
     return new ConnectionImplementation.ServerErrorTracker(
         this.serverTrackerTimeout, this.numTries);
   }
@@ -696,283 +509,4 @@ class AsyncProcess {
     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
   }
 
-  /**
-   * Collect all advices from checkers and make the final decision.
-   */
-  @VisibleForTesting
-  static class RowCheckerHost {
-    private final List<RowChecker> checkers;
-    private boolean isEnd = false;
-    RowCheckerHost(final List<RowChecker> checkers) {
-      this.checkers = checkers;
-    }
-    void reset() throws InterruptedIOException {
-      isEnd = false;
-      InterruptedIOException e = null;
-      for (RowChecker checker : checkers) {
-        try {
-          checker.reset();
-        } catch (InterruptedIOException ex) {
-          e = ex;
-        }
-      }
-      if (e != null) {
-        throw e;
-      }
-    }
-    ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
-      if (isEnd) {
-        return ReturnCode.END;
-      }
-      ReturnCode code = ReturnCode.INCLUDE;
-      for (RowChecker checker : checkers) {
-        switch (checker.canTakeOperation(loc, rowSize)) {
-          case END:
-            isEnd = true;
-            code = ReturnCode.END;
-            break;
-          case SKIP:
-            code = ReturnCode.SKIP;
-            break;
-          case INCLUDE:
-          default:
-            break;
-        }
-        if (code == ReturnCode.END) {
-          break;
-        }
-      }
-      for (RowChecker checker : checkers) {
-        checker.notifyFinal(code, loc, rowSize);
-      }
-      return code;
-    }
-  }
-
-  /**
-   * Provide a way to control the flow of rows iteration.
-   */
-  // Visible for Testing. Adding @VisibleForTesting here doesn't work for some reason.
-  interface RowChecker {
-    enum ReturnCode {
-      /**
-       * Accept current row.
-       */
-      INCLUDE,
-      /**
-       * Skip current row.
-       */
-      SKIP,
-      /**
-       * No more row can be included.
-       */
-      END
-    };
-    ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
-    /**
-     * Add the final ReturnCode to the checker.
-     * The ReturnCode may be reversed, so the checker need the final decision to update
-     * the inner state.
-     */
-    void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
-    /**
-     * Reset the inner state.
-     */
-    void reset() throws InterruptedIOException ;
-  }
-
-  /**
-   * limit the heapsize of total submitted data.
-   * Reduce the limit of heapsize for submitting quickly
-   * if there is no running task.
-   */
-  @VisibleForTesting
-  static class SubmittedSizeChecker implements RowChecker {
-    private final long maxHeapSizeSubmit;
-    private long heapSize = 0;
-    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
-      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
-    }
-    @Override
-    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
-      if (heapSize >= maxHeapSizeSubmit) {
-        return ReturnCode.END;
-      }
-      return ReturnCode.INCLUDE;
-    }
-
-    @Override
-    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
-      if (code == ReturnCode.INCLUDE) {
-        heapSize += rowSize;
-      }
-    }
-
-    @Override
-    public void reset() {
-      heapSize = 0;
-    }
-  }
-  /**
-   * limit the max number of tasks in an AsyncProcess.
-   */
-  @VisibleForTesting
-  static class TaskCountChecker implements RowChecker {
-    private static final long MAX_WAITING_TIME = 1000; //ms
-    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
-    private final Set<ServerName> serversIncluded = new HashSet<>();
-    private final int maxConcurrentTasksPerRegion;
-    private final int maxTotalConcurrentTasks;
-    private final int maxConcurrentTasksPerServer;
-    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
-    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
-    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-    private final AtomicLong tasksInProgress;
-    TaskCountChecker(final int maxTotalConcurrentTasks,
-      final int maxConcurrentTasksPerServer,
-      final int maxConcurrentTasksPerRegion,
-      final AtomicLong tasksInProgress,
-      final Map<ServerName, AtomicInteger> taskCounterPerServer,
-      final Map<byte[], AtomicInteger> taskCounterPerRegion) {
-      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
-      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
-      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
-      this.taskCounterPerRegion = taskCounterPerRegion;
-      this.taskCounterPerServer = taskCounterPerServer;
-      this.tasksInProgress = tasksInProgress;
-    }
-    @Override
-    public void reset() throws InterruptedIOException {
-      // prevent the busy-waiting
-      waitForRegion();
-      regionsIncluded.clear();
-      serversIncluded.clear();
-      busyRegions.clear();
-    }
-    private void waitForRegion() throws InterruptedIOException {
-      if (busyRegions.isEmpty()) {
-        return;
-      }
-      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
-      final long start = ee.currentTime();
-      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
-        for (byte[] region : busyRegions) {
-          AtomicInteger count = taskCounterPerRegion.get(region);
-          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
-            return;
-          }
-        }
-        try {
-          synchronized (tasksInProgress) {
-            tasksInProgress.wait(10);
-          }
-        } catch (InterruptedException e) {
-          throw new InterruptedIOException("Interrupted." +
-              " tasksInProgress=" + tasksInProgress);
-        }
-      }
-    }
-    /**
-     * 1) check the regions is allowed.
-     * 2) check the concurrent tasks for regions.
-     * 3) check the total concurrent tasks.
-     * 4) check the concurrent tasks for server.
-     * @param loc
-     * @param rowSize
-     * @return
-     */
-    @Override
-    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
-
-      HRegionInfo regionInfo = loc.getRegionInfo();
-      if (regionsIncluded.contains(regionInfo)) {
-        // We already know what to do with this region.
-        return ReturnCode.INCLUDE;
-      }
-      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
-      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
-        // Too many tasks on this region already.
-        return ReturnCode.SKIP;
-      }
-      int newServers = serversIncluded.size()
-        + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
-      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
-        // Too many tasks.
-        return ReturnCode.SKIP;
-      }
-      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
-      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
-        // Too many tasks for this individual server
-        return ReturnCode.SKIP;
-      }
-      return ReturnCode.INCLUDE;
-    }
-
-    @Override
-    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
-      if (code == ReturnCode.INCLUDE) {
-        regionsIncluded.add(loc.getRegionInfo());
-        serversIncluded.add(loc.getServerName());
-      }
-      busyRegions.add(loc.getRegionInfo().getRegionName());
-    }
-  }
-
-  /**
-   * limit the request size for each regionserver.
-   */
-  @VisibleForTesting
-  static class RequestSizeChecker implements RowChecker {
-    private final long maxHeapSizePerRequest;
-    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
-    RequestSizeChecker(final long maxHeapSizePerRequest) {
-      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
-    }
-    @Override
-    public void reset() {
-      serverRequestSizes.clear();
-    }
-    @Override
-    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
-      // Is it ok for limit of request size?
-      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
-        serverRequestSizes.get(loc.getServerName()) : 0L;
-      // accept at least one request
-      if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
-        return ReturnCode.INCLUDE;
-      }
-      return ReturnCode.SKIP;
-    }
-
-    @Override
-    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
-      if (code == ReturnCode.INCLUDE) {
-        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
-          serverRequestSizes.get(loc.getServerName()) : 0L;
-        serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
-      }
-    }
-  }
-
-  public static class ListRowAccess<T> implements RowAccess<T> {
-    private final List<T> data;
-    ListRowAccess(final List<T> data) {
-      this.data = data;
-    }
-
-    @Override
-    public int size() {
-      return data.size();
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return data.isEmpty();
-    }
-
-    @Override
-    public Iterator<T> iterator() {
-      return data.iterator();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
new file mode 100644
index 0000000..eda1db2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+
+/**
+ * Contains the attributes of a task which will be executed
+ * by {@link org.apache.hadoop.hbase.client.AsyncProcess}.
+ * The attributes will be validated by AsyncProcess.
+ * It's intended for advanced client applications.
+ * @param <T> The type of response from server-side
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncProcessTask<T> {
+  /**
+   * The number of processed rows.
+   * The AsyncProcess has traffic control which may reject some rows.
+   */
+  public enum SubmittedRows {
+    ALL,
+    AT_LEAST_ONE,
+    NORMAL
+  }
+  public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) {
+    return new Builder<>(callback);
+  }
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder<T> {
+
+    private ExecutorService pool;
+    private TableName tableName;
+    private RowAccess<? extends Row> rows;
+    private SubmittedRows submittedRows = SubmittedRows.ALL;
+    private Batch.Callback<T> callback;
+    private boolean needResults;
+    private int rpcTimeout;
+    private int operationTimeout;
+    private CancellableRegionServerCallable callable;
+    private Object[] results;
+
+    private Builder() {
+    }
+
+    private Builder(Batch.Callback<T> callback) {
+      this.callback = callback;
+    }
+
+    Builder<T> setResults(Object[] results) {
+      this.results = results;
+      if (results != null && results.length != 0) {
+        setNeedResults(true);
+      }
+      return this;
+    }
+
+    public Builder<T> setPool(ExecutorService pool) {
+      this.pool = pool;
+      return this;
+    }
+
+    public Builder<T> setRpcTimeout(int rpcTimeout) {
+      this.rpcTimeout = rpcTimeout;
+      return this;
+    }
+
+    public Builder<T> setOperationTimeout(int operationTimeout) {
+      this.operationTimeout = operationTimeout;
+      return this;
+    }
+
+    public Builder<T> setTableName(TableName tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public Builder<T> setRowAccess(List<? extends Row> rows) {
+      this.rows = new ListRowAccess<>(rows);
+      return this;
+    }
+
+    public Builder<T> setRowAccess(RowAccess<? extends Row> rows) {
+      this.rows = rows;
+      return this;
+    }
+
+    public Builder<T> setSubmittedRows(SubmittedRows submittedRows) {
+      this.submittedRows = submittedRows;
+      return this;
+    }
+
+    public Builder<T> setNeedResults(boolean needResults) {
+      this.needResults = needResults;
+      return this;
+    }
+
+    Builder<T> setCallable(CancellableRegionServerCallable callable) {
+      this.callable = callable;
+      return this;
+    }
+
+    public AsyncProcessTask<T> build() {
+      return new AsyncProcessTask<>(pool, tableName, rows, submittedRows,
+              callback, callable, needResults, rpcTimeout, operationTimeout, results);
+    }
+  }
+  private final ExecutorService pool;
+  private final TableName tableName;
+  private final RowAccess<? extends Row> rows;
+  private final SubmittedRows submittedRows;
+  private final Batch.Callback<T> callback;
+  private final CancellableRegionServerCallable callable;
+  private final boolean needResults;
+  private final int rpcTimeout;
+  private final int operationTimeout;
+  private final Object[] results;
+  AsyncProcessTask(AsyncProcessTask<T> task) {
+    this(task.getPool(), task.getTableName(), task.getRowAccess(),
+        task.getSubmittedRows(), task.getCallback(), task.getCallable(),
+        task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(),
+        task.getResults());
+  }
+  AsyncProcessTask(ExecutorService pool, TableName tableName,
+          RowAccess<? extends Row> rows, SubmittedRows size, Batch.Callback<T> callback,
+          CancellableRegionServerCallable callable, boolean needResults,
+          int rpcTimeout, int operationTimeout, Object[] results) {
+    this.pool = pool;
+    this.tableName = tableName;
+    this.rows = rows;
+    this.submittedRows = size;
+    this.callback = callback;
+    this.callable = callable;
+    this.needResults = needResults;
+    this.rpcTimeout = rpcTimeout;
+    this.operationTimeout = operationTimeout;
+    this.results = results;
+  }
+
+  public int getOperationTimeout() {
+    return operationTimeout;
+  }
+
+  public ExecutorService getPool() {
+    return pool;
+  }
+
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  public RowAccess<? extends Row> getRowAccess() {
+    return rows;
+  }
+
+  public SubmittedRows getSubmittedRows() {
+    return submittedRows;
+  }
+
+  public Batch.Callback<T> getCallback() {
+    return callback;
+  }
+
+  CancellableRegionServerCallable getCallable() {
+    return callable;
+  }
+
+  Object[] getResults() {
+    return results;
+  }
+
+  public boolean getNeedResults() {
+    return needResults;
+  }
+
+  public int getRpcTimeout() {
+    return rpcTimeout;
+  }
+
+  static class ListRowAccess<T> implements RowAccess<T> {
+
+    private final List<T> data;
+
+    ListRowAccess(final List<T> data) {
+      this.data = data;
+    }
+
+    @Override
+    public int size() {
+      return data.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return data.isEmpty();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      return data.iterator();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index d176ce1..036196e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -300,11 +300,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   private final int[] replicaGetIndices;
   private final boolean hasAnyReplicaGets;
   private final long nonceGroup;
-  private CancellableRegionServerCallable currentCallable;
-  private int operationTimeout;
-  private int rpcTimeout;
+  private final CancellableRegionServerCallable currentCallable;
+  private final int operationTimeout;
+  private final int rpcTimeout;
   private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
-  protected AsyncProcess asyncProcess;
+  private final AsyncProcess asyncProcess;
 
   /**
    * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
@@ -339,32 +339,27 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     }
   }
 
-
-
-  public AsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
-      ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
-      CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
-      AsyncProcess asyncProcess) {
-    this.pool = pool;
-    this.callback = callback;
+  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
+      long nonceGroup, AsyncProcess asyncProcess) {
+    this.pool = task.getPool();
+    this.callback = task.getCallback();
     this.nonceGroup = nonceGroup;
-    this.tableName = tableName;
+    this.tableName = task.getTableName();
     this.actionsInProgress.set(actions.size());
-    if (results != null) {
-      assert needResults;
-      if (results.length != actions.size()) {
+    if (task.getResults() == null) {
+      results = task.getNeedResults() ? new Object[actions.size()] : null;
+    } else {
+      if (task.getResults().length != actions.size()) {
         throw new AssertionError("results.length");
       }
-      this.results = results;
+      this.results = task.getResults();
       for (int i = 0; i != this.results.length; ++i) {
         results[i] = null;
       }
-    } else {
-      this.results = needResults ? new Object[actions.size()] : null;
     }
     List<Integer> replicaGetIndices = null;
     boolean hasAnyReplicaGets = false;
-    if (needResults) {
+    if (results != null) {
       // Check to see if any requests might require replica calls.
       // We expect that many requests will consist of all or no multi-replica gets; in such
       // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
@@ -414,10 +409,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     this.errorsByServer = createServerErrorTracker();
     this.errors = (asyncProcess.globalErrors != null)
         ? asyncProcess.globalErrors : new BatchErrors();
-    this.operationTimeout = operationTimeout;
-    this.rpcTimeout = rpcTimeout;
-    this.currentCallable = callable;
-    if (callable == null) {
+    this.operationTimeout = task.getOperationTimeout();
+    this.rpcTimeout = task.getRpcTimeout();
+    this.currentCallable = task.getCallable();
+    if (task.getCallable() == null) {
       tracker = new RetryingTimeTracker().start();
     }
   }
@@ -1246,9 +1241,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
           lastLog = now;
           LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
               + "  actions to finish on table: " + tableName);
-          if (currentInProgress <= asyncProcess.thresholdToLogUndoneTaskDetails) {
-            asyncProcess.logDetailsOfUndoneTasks(currentInProgress);
-          }
         }
       }
       synchronized (actionsInProgress) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 0085767..2a55de9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -19,12 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Collections;
@@ -36,6 +33,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
  * <p>
@@ -67,61 +66,70 @@ public class BufferedMutatorImpl implements BufferedMutator {
       "hbase.client.bufferedmutator.classname";
 
   private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
-  
+
   private final ExceptionListener listener;
 
-  protected ClusterConnection connection; // non-final so can be overridden in test
   private final TableName tableName;
-  private volatile Configuration conf;
-
-  @VisibleForTesting
-  final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
-  @VisibleForTesting
-  AtomicLong currentWriteBufferSize = new AtomicLong(0);
 
+  private final Configuration conf;
+  private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
+  private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
   /**
    * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
    * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
    */
-  @VisibleForTesting
-  AtomicInteger undealtMutationCount = new AtomicInteger(0);
-  private long writeBufferSize;
+  private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
+  private volatile long writeBufferSize;
   private final int maxKeyValueSize;
-  private boolean closed = false;
   private final ExecutorService pool;
-  private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
-  private int operationTimeout;
+  private final AtomicInteger rpcTimeout;
+  private final AtomicInteger operationTimeout;
+  private final boolean cleanupPoolOnClose;
+  private volatile boolean closed = false;
+  private final AsyncProcess ap;
 
   @VisibleForTesting
-  protected AsyncProcess ap; // non-final so can be overridden in test
-
-  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
-      RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+  BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
     if (conn == null || conn.isClosed()) {
       throw new IllegalArgumentException("Connection is null or closed.");
     }
-
     this.tableName = params.getTableName();
-    this.connection = conn;
-    this.conf = connection.getConfiguration();
-    this.pool = params.getPool();
+    this.conf = conn.getConfiguration();
     this.listener = params.getListener();
-
+    if (params.getPool() == null) {
+      this.pool = HTable.getDefaultExecutor(conf);
+      cleanupPoolOnClose = true;
+    } else {
+      this.pool = params.getPool();
+      cleanupPoolOnClose = false;
+    }
     ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
     this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
         params.getWriteBufferSize() : tableConf.getWriteBufferSize();
     this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
         params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
 
-    this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
-        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.operationTimeout = conn.getConfiguration().getInt(
-        HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-    // puts need to track errors globally due to how the APIs currently work.
-    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
-        writeRpcTimeout, operationTimeout);
+    this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
+    params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
+    this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
+    params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
+    this.ap = ap;
+  }
+  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
+      RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+    this(conn, params,
+      // puts need to track errors globally due to how the APIs currently work.
+      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
+  }
+
+  @VisibleForTesting
+  ExecutorService getPool() {
+    return pool;
+  }
+
+  @VisibleForTesting
+  AsyncProcess getAsyncProcess() {
+    return ap;
   }
 
   @Override
@@ -193,22 +201,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
       // As we can have an operation in progress even if the buffer is empty, we call
       // backgroundFlushCommits at least one time.
       backgroundFlushCommits(true);
-      this.pool.shutdown();
-      boolean terminated;
-      int loopCnt = 0;
-      do {
-        // wait until the pool has terminated
-        terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
-        loopCnt += 1;
-        if (loopCnt >= 10) {
-          LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
-          break;
-        }
-      } while (!terminated);
-
+      if (cleanupPoolOnClose) {
+        this.pool.shutdown();
+        boolean terminated;
+        int loopCnt = 0;
+        do {
+          // wait until the pool has terminated
+          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+          loopCnt += 1;
+          if (loopCnt >= 10) {
+            LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
+            break;
+          }
+        } while (!terminated);
+      }
     } catch (InterruptedException e) {
       LOG.warn("waitForTermination interrupted");
-
     } finally {
       this.closed = true;
     }
@@ -239,8 +247,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
 
     if (!synchronous) {
       QueueRowAccess taker = new QueueRowAccess();
+      AsyncProcessTask task = wrapAsyncProcessTask(taker);
       try {
-        ap.submit(tableName, taker, true, null, false);
+        ap.submit(task);
         if (ap.hasError()) {
           LOG.debug(tableName + ": One or more of the operations have failed -"
               + " waiting for all operation in progress to finish (successfully or not)");
@@ -251,17 +260,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
     if (synchronous || ap.hasError()) {
       QueueRowAccess taker = new QueueRowAccess();
+      AsyncProcessTask task = wrapAsyncProcessTask(taker);
       try {
         while (!taker.isEmpty()) {
-          ap.submit(tableName, taker, true, null, false);
+          ap.submit(task);
           taker.reset();
         }
       } finally {
         taker.restoreRemainder();
       }
-
       RetriesExhaustedWithDetailsException error =
-          ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
+          ap.waitForAllPreviousOpsAndReset(null, tableName);
       if (error != null) {
         if (listener == null) {
           throw error;
@@ -273,8 +282,38 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   /**
+   * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
+   * @param taker access the inner buffer.
+   * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
+   */
+  private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+        .setPool(pool)
+        .setTableName(tableName)
+        .setRowAccess(taker)
+        .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+        .build();
+    return new AsyncProcessTask(task) {
+      @Override
+      public int getRpcTimeout() {
+        return rpcTimeout.get();
+      }
+
+      @Override
+      public int getOperationTimeout() {
+        return operationTimeout.get();
+      }
+    };
+  }
+  /**
    * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
    * not be called for production uses.
+   * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
+   * will be called.
+   * @param writeBufferSize The max size of internal buffer where data is stored.
+   * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
+   * if an I/O error occurs and there are too many retries.
+   * @throws java.io.InterruptedIOException if the I/O task is interrupted.
    * @deprecated Going away when we drop public support for {@link HTable}.
    */
   @Deprecated
@@ -295,15 +334,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   @Override
-  public void setRpcTimeout(int timeout) {
-    this.writeRpcTimeout = timeout;
-    ap.setRpcTimeout(timeout);
+  public void setRpcTimeout(int rpcTimeout) {
+    this.rpcTimeout.set(rpcTimeout);
   }
 
   @Override
-  public void setOperationTimeout(int timeout) {
-    this.operationTimeout = timeout;
-    ap.setOperationTimeout(operationTimeout);
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout.set(operationTimeout);
+  }
+
+  @VisibleForTesting
+  long getCurrentWriteBufferSize() {
+    return currentWriteBufferSize.get();
+  }
+
+  @VisibleForTesting
+  int size() {
+    return undealtMutationCount.get();
   }
 
   private class QueueRowAccess implements RowAccess<Row> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 17c69ec..9c901e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -39,7 +39,8 @@ public class BufferedMutatorParams implements Cloneable {
   private int maxKeyValueSize = UNSET;
   private ExecutorService pool = null;
   private String implementationClassName = null;
-
+  private int rpcTimeout = UNSET;
+  private int operationTimeout = UNSET;
   private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
     @Override
     public void onException(RetriesExhaustedWithDetailsException exception,
@@ -61,6 +62,24 @@ public class BufferedMutatorParams implements Cloneable {
     return writeBufferSize;
   }
 
+  public BufferedMutatorParams rpcTimeout(final int rpcTimeout) {
+    this.rpcTimeout = rpcTimeout;
+    return this;
+  }
+
+  public int getRpcTimeout() {
+    return rpcTimeout;
+  }
+
+  public BufferedMutatorParams opertationTimeout(final int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+    return this;
+  }
+
+  public int getOperationTimeout() {
+    return operationTimeout;
+  }
+
   /**
    * Override the write buffer size specified by the provided {@link Connection}'s
    * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 35bebae..41f5baf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -42,7 +42,8 @@ public class ConnectionConfiguration {
   private final int replicaCallTimeoutMicroSecondScan;
   private final int retries;
   private final int maxKeyValueSize;
-
+  private final int readRpcTimeout;
+  private final int writeRpcTimeout;
     // toggle for async/sync prefetch
   private final boolean clientScannerAsyncPrefetch;
 
@@ -80,6 +81,12 @@ public class ConnectionConfiguration {
        Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
 
     this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+
+    this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+    this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
   }
 
   /**
@@ -99,6 +106,16 @@ public class ConnectionConfiguration {
     this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
     this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
     this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+    this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+  }
+
+  public int getReadRpcTimeout() {
+    return readRpcTimeout;
+  }
+
+  public int getWriteRpcTimeout() {
+    return writeRpcTimeout;
   }
 
   public long getWriteBufferSize() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index a597be3..ceac3fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -249,7 +249,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
-    this.asyncProcess = createAsyncProcess(this.conf);
+    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
     if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
       this.metrics = new MetricsConnection(this);
     } else {
@@ -1833,17 +1833,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     metaCache.clearCache(regionInfo);
   }
 
-  // For tests to override.
-  protected AsyncProcess createAsyncProcess(Configuration conf) {
-    // No default pool available.
-    int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-    int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-    return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
-        rpcTimeout, operationTimeout);
-  }
-
   @Override
   public AsyncProcess getAsyncProcess() {
     return asyncProcess;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index dd11abf..fd5eda3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads;
 @InterfaceStability.Stable
 public class HTable implements Table {
   private static final Log LOG = LogFactory.getLog(HTable.class);
-  protected ClusterConnection connection;
+  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
+  private final ClusterConnection connection;
   private final TableName tableName;
-  private volatile Configuration configuration;
-  private ConnectionConfiguration connConfiguration;
-  protected BufferedMutatorImpl mutator;
+  private final Configuration configuration;
+  private final ConnectionConfiguration connConfiguration;
+  @VisibleForTesting
+  BufferedMutatorImpl mutator;
   private boolean closed = false;
-  protected int scannerCaching;
-  protected long scannerMaxResultSize;
-  private ExecutorService pool;  // For Multi & Scan
+  private final int scannerCaching;
+  private final long scannerMaxResultSize;
+  private final ExecutorService pool;  // For Multi & Scan
   private int operationTimeout; // global timeout for each blocking method with retrying rpc
   private int readRpcTimeout; // timeout for each read rpc request
   private int writeRpcTimeout; // timeout for each write rpc request
   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
-  private final boolean cleanupConnectionOnClose; // close the connection in close()
-  private Consistency defaultConsistency = Consistency.STRONG;
-  private HRegionLocator locator;
+  private final HRegionLocator locator;
 
   /** The Async process for batch */
-  protected AsyncProcess multiAp;
-  private RpcRetryingCallerFactory rpcCallerFactory;
-  private RpcControllerFactory rpcControllerFactory;
+  @VisibleForTesting
+  AsyncProcess multiAp;
+  private final RpcRetryingCallerFactory rpcCallerFactory;
+  private final RpcControllerFactory rpcControllerFactory;
 
   // Marked Private @since 1.0
   @InterfaceAudience.Private
@@ -167,22 +168,42 @@ public class HTable implements Table {
       throw new IllegalArgumentException("Given table name is null");
     }
     this.tableName = tableName;
-    this.cleanupConnectionOnClose = false;
     this.connection = connection;
     this.configuration = connection.getConfiguration();
-    this.connConfiguration = tableConfig;
-    this.pool = pool;
+    if (tableConfig == null) {
+      connConfiguration = new ConnectionConfiguration(configuration);
+    } else {
+      connConfiguration = tableConfig;
+    }
     if (pool == null) {
       this.pool = getDefaultExecutor(this.configuration);
       this.cleanupPoolOnClose = true;
     } else {
+      this.pool = pool;
       this.cleanupPoolOnClose = false;
     }
+    if (rpcCallerFactory == null) {
+      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
+    } else {
+      this.rpcCallerFactory = rpcCallerFactory;
+    }
 
-    this.rpcCallerFactory = rpcCallerFactory;
-    this.rpcControllerFactory = rpcControllerFactory;
+    if (rpcControllerFactory == null) {
+      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
+    } else {
+      this.rpcControllerFactory = rpcControllerFactory;
+    }
+
+    this.operationTimeout = tableName.isSystemTable() ?
+        connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+    this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+    this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+    this.scannerCaching = connConfiguration.getScannerCaching();
+    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
 
-    this.finishSetup();
+    // puts need to track errors globally due to how the APIs currently work.
+    multiAp = this.connection.getAsyncProcess();
+    this.locator = new HRegionLocator(tableName, connection);
   }
 
   /**
@@ -190,20 +211,23 @@ public class HTable implements Table {
    * @throws IOException
    */
   @VisibleForTesting
-  protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
+  protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
     connection = conn;
-    tableName = params.getTableName();
-    connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
+    this.tableName = mutator.getName();
+    this.configuration = connection.getConfiguration();
+    connConfiguration = new ConnectionConfiguration(configuration);
     cleanupPoolOnClose = false;
-    cleanupConnectionOnClose = false;
-    // used from tests, don't trust the connection is real
-    this.mutator = new BufferedMutatorImpl(conn, null, null, params);
-    this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
-        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
-        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    this.mutator = mutator;
+    this.operationTimeout = tableName.isSystemTable() ?
+        connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+    this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+    this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+    this.scannerCaching = connConfiguration.getScannerCaching();
+    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
+    this.rpcControllerFactory = null;
+    this.rpcCallerFactory = null;
+    this.pool = mutator.getPool();
+    this.locator = null;
   }
 
   /**
@@ -214,36 +238,6 @@ public class HTable implements Table {
   }
 
   /**
-   * setup this HTable's parameter based on the passed configuration
-   */
-  private void finishSetup() throws IOException {
-    if (connConfiguration == null) {
-      connConfiguration = new ConnectionConfiguration(configuration);
-    }
-
-    this.operationTimeout = tableName.isSystemTable() ?
-        connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
-    this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
-        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
-        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.scannerCaching = connConfiguration.getScannerCaching();
-    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
-    if (this.rpcCallerFactory == null) {
-      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
-    }
-    if (this.rpcControllerFactory == null) {
-      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
-    }
-
-    // puts need to track errors globally due to how the APIs currently work.
-    multiAp = this.connection.getAsyncProcess();
-    this.locator = new HRegionLocator(getName(), connection);
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -423,7 +417,7 @@ public class HTable implements Table {
       get = ReflectionUtils.newInstance(get.getClass(), get);
       get.setCheckExistenceOnly(checkExistenceOnly);
       if (get.getConsistency() == null){
-        get.setConsistency(defaultConsistency);
+        get.setConsistency(DEFAULT_CONSISTENCY);
       }
     }
 
@@ -483,13 +477,37 @@ public class HTable implements Table {
   @Override
   public void batch(final List<? extends Row> actions, final Object[] results)
       throws InterruptedException, IOException {
-    batch(actions, results, -1);
+    int rpcTimeout = writeRpcTimeout;
+    boolean hasRead = false;
+    boolean hasWrite = false;
+    for (Row action : actions) {
+      if (action instanceof Mutation) {
+        hasWrite = true;
+      } else {
+        hasRead = true;
+      }
+      if (hasRead && hasWrite) {
+        break;
+      }
+    }
+    if (hasRead && !hasWrite) {
+      rpcTimeout = readRpcTimeout;
+    }
+    batch(actions, results, rpcTimeout);
   }
 
   public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
       throws InterruptedException, IOException {
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
-        rpcTimeout);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(actions)
+            .setResults(results)
+            .setRpcTimeout(rpcTimeout)
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -509,8 +527,20 @@ public class HTable implements Table {
   public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
     Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
     throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-    AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
-      pool, tableName, actions, callback, results);
+    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
+    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(actions)
+            .setResults(results)
+            .setOperationTimeout(operationTimeout)
+            .setRpcTimeout(writeTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -536,8 +566,16 @@ public class HTable implements Table {
       }
     };
     List<Delete> rows = Collections.singletonList(delete);
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
-        null, null, callable, writeRpcTimeout);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(rows)
+            .setCallable(callable)
+            .setRpcTimeout(writeRpcTimeout)
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -615,8 +653,16 @@ public class HTable implements Table {
         return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
       }
     };
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
-        null, null, callable, writeRpcTimeout);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(rm.getMutations())
+            .setCallable(callable)
+            .setRpcTimeout(writeRpcTimeout)
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -795,8 +841,18 @@ public class HTable implements Table {
     };
     List<Delete> rows = Collections.singletonList(delete);
     Object[] results = new Object[1];
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
-        null, results, callable, -1);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(rows)
+            .setCallable(callable)
+            // TODO any better timeout?
+            .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .setResults(results)
+            .build();
+    AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -839,8 +895,18 @@ public class HTable implements Table {
      *  It is excessive to send such a large array, but that is required by the framework right now
      * */
     Object[] results = new Object[rm.getMutations().size()];
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
-      null, results, callable, -1);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(rm.getMutations())
+            .setResults(results)
+            .setCallable(callable)
+            // TODO any better timeout?
+            .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = multiAp.submit(task);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -926,6 +992,10 @@ public class HTable implements Table {
       return;
     }
     flushCommits();
+    if (mutator != null) {
+      mutator.close();
+      mutator = null;
+    }
     if (cleanupPoolOnClose) {
       this.pool.shutdown();
       try {
@@ -939,11 +1009,6 @@ public class HTable implements Table {
         LOG.warn("waitForTermination interrupted");
       }
     }
-    if (cleanupConnectionOnClose) {
-      if (this.connection != null) {
-        this.connection.close();
-      }
-    }
     this.closed = true;
   }
 
@@ -1102,7 +1167,6 @@ public class HTable implements Table {
     if (mutator != null) {
       mutator.setOperationTimeout(operationTimeout);
     }
-    multiAp.setOperationTimeout(operationTimeout);
   }
 
   @Override
@@ -1134,7 +1198,6 @@ public class HTable implements Table {
     if (mutator != null) {
       mutator.setRpcTimeout(writeRpcTimeout);
     }
-    multiAp.setRpcTimeout(writeRpcTimeout);
   }
 
   @Override
@@ -1217,37 +1280,41 @@ public class HTable implements Table {
     Object[] results = new Object[execs.size()];
 
     AsyncProcess asyncProcess =
-        new AsyncProcess(connection, configuration, pool,
+        new AsyncProcess(connection, configuration,
             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
-            true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
-            operationTimeout);
-
-    AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
-        new Callback<ClientProtos.CoprocessorServiceResult>() {
-          @Override
-          public void update(byte[] region, byte[] row,
-                              ClientProtos.CoprocessorServiceResult serviceResult) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
-                  ": region=" + Bytes.toStringBinary(region) +
-                  ", row=" + Bytes.toStringBinary(row) +
-                  ", value=" + serviceResult.getValue().getValue());
-            }
-            try {
-              Message.Builder builder = responsePrototype.newBuilderForType();
-              org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
-                  serviceResult.getValue().getValue().toByteArray());
-              callback.update(region, row, (R) builder.build());
-            } catch (IOException e) {
-              LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
-                  e);
-              callbackErrorExceptions.add(e);
-              callbackErrorActions.add(execsByRow.get(row));
-              callbackErrorServers.add("null");
-            }
-          }
-        }, results);
-
+            true, RpcControllerFactory.instantiate(configuration));
+
+    Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
+    = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
+            ": region=" + Bytes.toStringBinary(region) +
+            ", row=" + Bytes.toStringBinary(row) +
+            ", value=" + serviceResult.getValue().getValue());
+      }
+      try {
+        Message.Builder builder = responsePrototype.newBuilderForType();
+        org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
+            serviceResult.getValue().getValue().toByteArray());
+        callback.update(region, row, (R) builder.build());
+      } catch (IOException e) {
+        LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
+            e);
+        callbackErrorExceptions.add(e);
+        callbackErrorActions.add(execsByRow.get(row));
+        callbackErrorServers.add("null");
+      }
+    };
+    AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = AsyncProcessTask.newBuilder(resultsCallback)
+            .setPool(pool)
+            .setTableName(tableName)
+            .setRowAccess(execs)
+            .setResults(results)
+            .setRpcTimeout(readRpcTimeout)
+            .setOperationTimeout(operationTimeout)
+            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture future = asyncProcess.submit(task);
     future.waitUntilDone();
 
     if (future.hasError()) {
@@ -1270,10 +1337,10 @@ public class HTable implements Table {
               .pool(pool)
               .writeBufferSize(connConfiguration.getWriteBufferSize())
               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
+              .opertationTimeout(operationTimeout)
+              .rpcTimeout(writeRpcTimeout)
       );
     }
-    mutator.setRpcTimeout(writeRpcTimeout);
-    mutator.setOperationTimeout(operationTimeout);
     return mutator;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 8ff64bf..c03b969 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -443,7 +443,7 @@ public class HTableMultiplexer {
     private final AtomicInteger retryInQueue = new AtomicInteger(0);
     private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
     private final int operationTimeout;
-
+    private final ExecutorService pool;
     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
         ExecutorService pool, ScheduledExecutorService executor) {
@@ -457,10 +457,10 @@ public class HTableMultiplexer {
               HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
       this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
           HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
-          writeRpcTimeout, operationTimeout);
+      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
       this.executor = executor;
       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+      this.pool = pool;
     }
 
     protected LinkedBlockingQueue<PutStatus> getQueue() {
@@ -594,9 +594,14 @@ public class HTableMultiplexer {
         Map<ServerName, MultiAction> actionsByServer =
             Collections.singletonMap(server, actions);
         try {
+          AsyncProcessTask task = AsyncProcessTask.newBuilder()
+                  .setResults(results)
+                  .setPool(pool)
+                  .setRpcTimeout(writeRpcTimeout)
+                  .setOperationTimeout(operationTimeout)
+                  .build();
           AsyncRequestFuture arf =
-              ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
-                null, actionsByServer, null);
+              ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
           arf.waitUntilDone();
           if (arf.hasError()) {
             // We just log and ignore the exception here since failed Puts will be resubmit again.