You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/24 04:10:46 UTC

[2/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
new file mode 100644
index 0000000..7e9c968
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An interface for client request scheduling algorithm.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface RequestController {
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public enum ReturnCode {
+    /**
+     * Accept current row.
+     */
+    INCLUDE,
+    /**
+     * Skip current row.
+     */
+    SKIP,
+    /**
+     * No more row can be included.
+     */
+    END
+  }
+
+  /**
+   * Picks up the valid data.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public interface Checker {
+    /**
+     * Checks the data whether it is valid to submit.
+     * @param loc the destination of data
+     * @param row the data to check
+     * @return describe the decision for the row
+     */
+    ReturnCode canTakeRow(HRegionLocation loc, Row row);
+
+    /**
+     * Reset the state of the scheduler when completing the iteration of rows.
+     * @throws InterruptedIOException some controller may wait
+     * for some busy region or RS to complete the undealt request.
+     */
+    void reset() throws InterruptedIOException ;
+  }
+
+  /**
+   * @return A new checker for evaluating a batch rows.
+   */
+  Checker newChecker();
+
+  /**
+   * Increment the counter if we build a valid task.
+   * @param regions The destination of task
+   * @param sn The target server
+   */
+  void incTaskCounters(Collection<byte[]> regions, ServerName sn);
+
+  /**
+   * Decrement the counter if a task is accomplished.
+   * @param regions The destination of task
+   * @param sn The target server
+   */
+  void decTaskCounters(Collection<byte[]> regions, ServerName sn);
+
+  /**
+   * @return The number of running task.
+   */
+  long getNumberOfTsksInProgress();
+
+  /**
+   * Waits for the running tasks to complete.
+   * If there are specified threshold and trigger, the implementation should
+   * wake up once in a while for checking the threshold and calling trigger.
+   * @param max This method will return if the number of running tasks is
+   * less than or equal to max.
+   * @param id the caller's id
+   * @param periodToTrigger The period to invoke the trigger. This value is a
+   * hint. The real period depends on the implementation.
+   * @param trigger The object to call periodically.
+   * @throws java.io.InterruptedIOException If the waiting is interrupted
+   */
+  void waitForMaximumCurrentTasks(long max, long id,
+    int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException;
+
+  /**
+   * Wait until there is at least one slot for a new task.
+   * @param id the caller's id
+   * @param periodToTrigger The period to invoke the trigger. This value is a
+   * hint. The real period depends on the implementation.
+   * @param trigger The object to call periodically.
+   * @throws java.io.InterruptedIOException If the waiting is interrupted
+   */
+  void waitForFreeSlot(long id, int periodToTrigger,
+          Consumer<Long> trigger) throws InterruptedIOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
new file mode 100644
index 0000000..7ed80f0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A factory class that constructs an {@link org.apache.hadoop.hbase.client.RequestController}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class RequestControllerFactory {
+  public static final String REQUEST_CONTROLLER_IMPL_CONF_KEY = "hbase.client.request.controller.impl";
+  /**
+   * Constructs a {@link org.apache.hadoop.hbase.client.RequestController}.
+   * @param conf The {@link Configuration} to use.
+   * @return A RequestController which is built according to the configuration.
+   */
+  public static RequestController create(Configuration conf) {
+    Class<? extends RequestController> clazz= conf.getClass(REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class, RequestController.class);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
index 788f1a4..85fd590 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-@VisibleForTesting
-interface RowAccess<T> extends Iterable<T> {
+public interface RowAccess<T> extends Iterable<T> {
   /**
    * @return true if there are no elements.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
new file mode 100644
index 0000000..473f264
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
@@ -0,0 +1,519 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Holds back the request if the submitted size or number has reached the
+ * threshold.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SimpleRequestController implements RequestController {
+  private static final Log LOG = LogFactory.getLog(SimpleRequestController.class);
+  /**
+   * The maximum size of single RegionServer.
+   */
+  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
+
+  /**
+   * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
+   */
+  @VisibleForTesting
+  static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
+
+  /**
+   * The maximum size of submit.
+   */
+  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
+  /**
+   * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
+   */
+  @VisibleForTesting
+  static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
+  @VisibleForTesting
+  final AtomicLong tasksInProgress = new AtomicLong(0);
+  @VisibleForTesting
+  final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
+          = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  @VisibleForTesting
+  final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
+  /**
+   * The number of tasks simultaneously executed on the cluster.
+   */
+  private final int maxTotalConcurrentTasks;
+
+  /**
+   * The max heap size of all tasks simultaneously executed on a server.
+   */
+  private final long maxHeapSizePerRequest;
+  private final long maxHeapSizeSubmit;
+  /**
+   * The number of tasks we run in parallel on a single region. With 1 (the
+   * default) , we ensure that the ordering of the queries is respected: we
+   * don't start a set of operations on a region before the previous one is
+   * done. As well, this limits the pressure we put on the region server.
+   */
+  @VisibleForTesting
+  final int maxConcurrentTasksPerRegion;
+
+  /**
+   * The number of task simultaneously executed on a single region server.
+   */
+  @VisibleForTesting
+  final int maxConcurrentTasksPerServer;
+  private final int thresholdToLogUndoneTaskDetails;
+  public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+      "hbase.client.threshold.log.details";
+  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+  public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
+      "hbase.client.threshold.log.region.details";
+  private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+  private final int thresholdToLogRegionDetails;
+  SimpleRequestController(final Configuration conf) {
+    this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+            HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
+    this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
+            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
+    this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
+            HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
+    this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+            DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+    this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
+    this.thresholdToLogUndoneTaskDetails =
+        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+    this.thresholdToLogRegionDetails =
+        conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS,
+          DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
+    if (this.maxTotalConcurrentTasks <= 0) {
+      throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
+    }
+    if (this.maxConcurrentTasksPerServer <= 0) {
+      throw new IllegalArgumentException("maxConcurrentTasksPerServer="
+              + maxConcurrentTasksPerServer);
+    }
+    if (this.maxConcurrentTasksPerRegion <= 0) {
+      throw new IllegalArgumentException("maxConcurrentTasksPerRegion="
+              + maxConcurrentTasksPerRegion);
+    }
+    if (this.maxHeapSizePerRequest <= 0) {
+      throw new IllegalArgumentException("maxHeapSizePerServer="
+              + maxHeapSizePerRequest);
+    }
+
+    if (this.maxHeapSizeSubmit <= 0) {
+      throw new IllegalArgumentException("maxHeapSizeSubmit="
+              + maxHeapSizeSubmit);
+    }
+  }
+
+  @VisibleForTesting
+  static Checker newChecker(List<RowChecker> checkers) {
+    return new Checker() {
+      private boolean isEnd = false;
+
+      @Override
+      public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
+        if (isEnd) {
+          return ReturnCode.END;
+        }
+        long rowSize = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
+        ReturnCode code = ReturnCode.INCLUDE;
+        for (RowChecker checker : checkers) {
+          switch (checker.canTakeOperation(loc, rowSize)) {
+            case END:
+              isEnd = true;
+              code = ReturnCode.END;
+              break;
+            case SKIP:
+              code = ReturnCode.SKIP;
+              break;
+            case INCLUDE:
+            default:
+              break;
+          }
+          if (code == ReturnCode.END) {
+            break;
+          }
+        }
+        for (RowChecker checker : checkers) {
+          checker.notifyFinal(code, loc, rowSize);
+        }
+        return code;
+      }
+
+      @Override
+      public void reset() throws InterruptedIOException {
+        isEnd = false;
+        InterruptedIOException e = null;
+        for (RowChecker checker : checkers) {
+          try {
+            checker.reset();
+          } catch (InterruptedIOException ex) {
+            e = ex;
+          }
+        }
+        if (e != null) {
+          throw e;
+        }
+      }
+    };
+  }
+
+  @Override
+  public Checker newChecker() {
+    List<RowChecker> checkers = new ArrayList<>(3);
+    checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
+            maxConcurrentTasksPerServer,
+            maxConcurrentTasksPerRegion,
+            tasksInProgress,
+            taskCounterPerServer,
+            taskCounterPerRegion));
+    checkers.add(new RequestSizeChecker(maxHeapSizePerRequest));
+    checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
+    return newChecker(checkers);
+  }
+
+  @Override
+  public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
+    tasksInProgress.incrementAndGet();
+
+    computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
+
+    regions.forEach((regBytes)
+            -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
+    );
+  }
+
+  @Override
+  public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
+    regions.forEach(regBytes -> {
+      AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
+      regionCnt.decrementAndGet();
+    });
+
+    taskCounterPerServer.get(sn).decrementAndGet();
+    tasksInProgress.decrementAndGet();
+    synchronized (tasksInProgress) {
+      tasksInProgress.notifyAll();
+    }
+  }
+
+  @Override
+  public long getNumberOfTsksInProgress() {
+    return tasksInProgress.get();
+  }
+
+  @Override
+  public void waitForMaximumCurrentTasks(long max, long id,
+    int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
+    assert max >= 0;
+    long lastLog = EnvironmentEdgeManager.currentTime();
+    long currentInProgress, oldInProgress = Long.MAX_VALUE;
+    while ((currentInProgress = tasksInProgress.get()) > max) {
+      if (oldInProgress != currentInProgress) { // Wait for in progress to change.
+        long now = EnvironmentEdgeManager.currentTime();
+        if (now > lastLog + periodToTrigger) {
+          lastLog = now;
+          if (trigger != null) {
+            trigger.accept(currentInProgress);
+          }
+          logDetailsOfUndoneTasks(currentInProgress);
+        }
+      }
+      oldInProgress = currentInProgress;
+      try {
+        synchronized (tasksInProgress) {
+          if (tasksInProgress.get() == oldInProgress) {
+            tasksInProgress.wait(10);
+          }
+        }
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("#" + id + ", interrupted." +
+            " currentNumberOfTask=" + currentInProgress);
+      }
+    }
+  }
+
+  private void logDetailsOfUndoneTasks(long taskInProgress) {
+    if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
+      ArrayList<ServerName> servers = new ArrayList<>();
+      for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
+        if (entry.getValue().get() > 0) {
+          servers.add(entry.getKey());
+        }
+      }
+      LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
+    }
+
+    if (taskInProgress <= thresholdToLogRegionDetails) {
+      ArrayList<String> regions = new ArrayList<>();
+      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
+        if (entry.getValue().get() > 0) {
+          regions.add(Bytes.toString(entry.getKey()));
+        }
+      }
+      LOG.info("Regions against which left over task(s) are processed: " + regions);
+    }
+  }
+
+  @Override
+  public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
+  }
+
+  /**
+   * limit the heapsize of total submitted data. Reduce the limit of heapsize
+   * for submitting quickly if there is no running task.
+   */
+  @VisibleForTesting
+  static class SubmittedSizeChecker implements RowChecker {
+
+    private final long maxHeapSizeSubmit;
+    private long heapSize = 0;
+
+    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
+      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
+    }
+
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+      if (heapSize >= maxHeapSizeSubmit) {
+        return ReturnCode.END;
+      }
+      return ReturnCode.INCLUDE;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        heapSize += rowSize;
+      }
+    }
+
+    @Override
+    public void reset() {
+      heapSize = 0;
+    }
+  }
+
+  /**
+   * limit the max number of tasks in an AsyncProcess.
+   */
+  @VisibleForTesting
+  static class TaskCountChecker implements RowChecker {
+
+    private static final long MAX_WAITING_TIME = 1000; //ms
+    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
+    private final Set<ServerName> serversIncluded = new HashSet<>();
+    private final int maxConcurrentTasksPerRegion;
+    private final int maxTotalConcurrentTasks;
+    private final int maxConcurrentTasksPerServer;
+    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
+    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
+    private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    private final AtomicLong tasksInProgress;
+
+    TaskCountChecker(final int maxTotalConcurrentTasks,
+            final int maxConcurrentTasksPerServer,
+            final int maxConcurrentTasksPerRegion,
+            final AtomicLong tasksInProgress,
+            final Map<ServerName, AtomicInteger> taskCounterPerServer,
+            final Map<byte[], AtomicInteger> taskCounterPerRegion) {
+      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
+      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
+      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
+      this.taskCounterPerRegion = taskCounterPerRegion;
+      this.taskCounterPerServer = taskCounterPerServer;
+      this.tasksInProgress = tasksInProgress;
+    }
+
+    @Override
+    public void reset() throws InterruptedIOException {
+      // prevent the busy-waiting
+      waitForRegion();
+      regionsIncluded.clear();
+      serversIncluded.clear();
+      busyRegions.clear();
+    }
+
+    private void waitForRegion() throws InterruptedIOException {
+      if (busyRegions.isEmpty()) {
+        return;
+      }
+      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+      final long start = ee.currentTime();
+      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
+        for (byte[] region : busyRegions) {
+          AtomicInteger count = taskCounterPerRegion.get(region);
+          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
+            return;
+          }
+        }
+        try {
+          synchronized (tasksInProgress) {
+            tasksInProgress.wait(10);
+          }
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException("Interrupted."
+                  + " tasksInProgress=" + tasksInProgress);
+        }
+      }
+    }
+
+    /**
+     * 1) check the regions is allowed. 2) check the concurrent tasks for
+     * regions. 3) check the total concurrent tasks. 4) check the concurrent
+     * tasks for server.
+     *
+     * @param loc
+     * @param rowSize
+     * @return
+     */
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+
+      HRegionInfo regionInfo = loc.getRegionInfo();
+      if (regionsIncluded.contains(regionInfo)) {
+        // We already know what to do with this region.
+        return ReturnCode.INCLUDE;
+      }
+      AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
+      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
+        // Too many tasks on this region already.
+        return ReturnCode.SKIP;
+      }
+      int newServers = serversIncluded.size()
+              + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
+      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
+        // Too many tasks.
+        return ReturnCode.SKIP;
+      }
+      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
+      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
+        // Too many tasks for this individual server
+        return ReturnCode.SKIP;
+      }
+      return ReturnCode.INCLUDE;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        regionsIncluded.add(loc.getRegionInfo());
+        serversIncluded.add(loc.getServerName());
+      }
+      busyRegions.add(loc.getRegionInfo().getRegionName());
+    }
+  }
+
+  /**
+   * limit the request size for each regionserver.
+   */
+  @VisibleForTesting
+  static class RequestSizeChecker implements RowChecker {
+
+    private final long maxHeapSizePerRequest;
+    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
+
+    RequestSizeChecker(final long maxHeapSizePerRequest) {
+      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
+    }
+
+    @Override
+    public void reset() {
+      serverRequestSizes.clear();
+    }
+
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+      // Is it ok for limit of request size?
+      long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
+              ? serverRequestSizes.get(loc.getServerName()) : 0L;
+      // accept at least one request
+      if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
+        return ReturnCode.INCLUDE;
+      }
+      return ReturnCode.SKIP;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
+                ? serverRequestSizes.get(loc.getServerName()) : 0L;
+        serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
+      }
+    }
+  }
+
+  /**
+   * Provide a way to control the flow of rows iteration.
+   */
+  @VisibleForTesting
+  interface RowChecker {
+
+    ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
+
+    /**
+     * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
+     * so the checker need the final decision to update the inner state.
+     *
+     * @param code The final decision
+     * @param loc the destination of data
+     * @param rowSize the data size
+     */
+    void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
+
+    /**
+     * Reset the inner state.
+     */
+    void reset() throws InterruptedIOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index bb6cbb5..ed7202a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -33,12 +33,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -59,15 +57,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
-import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost;
-import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -78,60 +69,64 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.mockito.Mockito;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
-import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+
 
 @Category({ClientTests.class, MediumTests.class})
 public class TestAsyncProcess {
   @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
       withLookingForStuckThread(true).build();
-  private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
+  private static final Log LOG = LogFactory.getLog(TestAsyncProcess.class);
   private static final TableName DUMMY_TABLE =
       TableName.valueOf("DUMMY_TABLE");
   private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
   private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
   private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
   private static final byte[] FAILS = "FAILS".getBytes();
-  private static final Configuration conf = new Configuration();
-
-  private static ServerName sn = ServerName.valueOf("s1:1,1");
-  private static ServerName sn2 = ServerName.valueOf("s2:2,2");
-  private static ServerName sn3 = ServerName.valueOf("s3:3,3");
-  private static HRegionInfo hri1 =
+  private static final Configuration CONF = new Configuration();
+  private static final ConnectionConfiguration CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
+  private static final ServerName sn = ServerName.valueOf("s1:1,1");
+  private static final ServerName sn2 = ServerName.valueOf("s2:2,2");
+  private static final ServerName sn3 = ServerName.valueOf("s3:3,3");
+  private static final HRegionInfo hri1 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
-  private static HRegionInfo hri2 =
+  private static final HRegionInfo hri2 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
-  private static HRegionInfo hri3 =
+  private static final HRegionInfo hri3 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
-  private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
-  private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
-  private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
+  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
+  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
+  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
 
   // Replica stuff
-  private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+  private static final HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
       hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
-  private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
-  private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+  private static final HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
       new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
-  private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+  private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
       new HRegionLocation(hri2r1, sn3));
-  private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+  private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
 
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
 
-  private static int NB_RETRIES = 3;
+  private static final int NB_RETRIES = 3;
 
+  private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+  private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
   @BeforeClass
   public static void beforeClass(){
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
+    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
   }
 
   static class CountingThreadFactory implements ThreadFactory {
@@ -153,20 +148,21 @@ public class TestAsyncProcess {
     final AtomicInteger nbActions = new AtomicInteger();
     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
     public AtomicInteger callsCt = new AtomicInteger();
-    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-    private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+
     private long previousTimeout = -1;
+    final ExecutorService service;
     @Override
-    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
-        List<Action> actions, long nonceGroup, ExecutorService pool,
-        Batch.Callback<Res> callback, Object[] results, boolean needResults,
-        CancellableRegionServerCallable callable, int curTimeout) {
+    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(
+      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
       // Test HTable has tableName of null, so pass DUMMY_TABLE
+      AsyncProcessTask wrap = new AsyncProcessTask(task){
+        @Override
+        public TableName getTableName() {
+          return DUMMY_TABLE;
+        }
+      };
       AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
-          DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
-          results, callback, callable, operationTimeout, rpcTimeout, this);
+          wrap, actions, nonceGroup, this);
       allReqs.add(r);
       return r;
     }
@@ -176,49 +172,54 @@ public class TestAsyncProcess {
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
-      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
-          new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
-            new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
-          operationTimeout);
+      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+          new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
     }
 
     public MyAsyncProcess(
         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
-      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
-          rpcTimeout, operationTimeout);
+      super(hc, conf,
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+      service = Executors.newFixedThreadPool(5);
     }
 
-    public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
-        @SuppressWarnings("unused") boolean dummy) {
-      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
-              new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
-        @Override
-        public void execute(Runnable command) {
-          throw new RejectedExecutionException("test under failure");
-        }
-      },
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
-          rpcTimeout, operationTimeout);
+    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
+        List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
+        boolean needResults) throws InterruptedIOException {
+      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
+              .setPool(pool == null ? service : pool)
+              .setTableName(tableName)
+              .setRowAccess(rows)
+              .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
+              .setNeedResults(needResults)
+              .setRpcTimeout(RPC_TIMEOUT)
+              .setOperationTimeout(OPERATION_TIMEOUT)
+              .build();
+      return submit(task);
+    }
+
+    public <CResult> AsyncRequestFuture submit(TableName tableName,
+        final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
+        boolean needResults) throws InterruptedIOException {
+      return submit(null, tableName, rows, atLeastOne, callback, needResults);
     }
 
     @Override
-    public <Res> AsyncRequestFuture submit(TableName tableName, RowAccess<? extends Row> rows,
-        boolean atLeastOne, Callback<Res> callback, boolean needResults)
+    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
             throws InterruptedIOException {
+      previousTimeout = task.getRpcTimeout();
       // We use results in tests to check things, so override to always save them.
-      return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
+      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
+        @Override
+        public boolean getNeedResults() {
+          return true;
+        }
+      };
+      return super.submit(wrap);
     }
 
     @Override
-    public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
-      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
-      CancellableRegionServerCallable callable, int curTimeout) {
-      previousTimeout = curTimeout;
-      return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
-    }
-    @Override
     protected RpcRetryingCaller<AbstractResponse> createCaller(
         CancellableRegionServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
@@ -260,12 +261,9 @@ public class TestAsyncProcess {
 
   static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
 
-    public MyAsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
-        ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
-        int rpcTimeout, AsyncProcess asyncProcess) {
-      super(tableName, actions, nonceGroup, pool, needResults,
-          results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
+    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
+      long nonceGroup, AsyncProcess asyncProcess) {
+      super(task, actions, nonceGroup, asyncProcess);
     }
 
     @Override
@@ -483,7 +481,7 @@ public class TestAsyncProcess {
     final boolean usedRegions[];
 
     protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
-      super(conf);
+      super(CONF);
       this.hrl = hrl;
       this.usedRegions = new boolean[hrl.size()];
     }
@@ -553,19 +551,7 @@ public class TestAsyncProcess {
     long putsHeapSize = writeBuffer;
     doSubmitRequest(writeBuffer, putsHeapSize);
   }
-  @Test
-  public void testIllegalArgument() throws IOException {
-    ClusterConnection conn = createHConnection();
-    final long maxHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
-      AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
-    conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
-    try {
-      MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
-      fail("The maxHeapSizePerRequest must be bigger than zero");
-    } catch (IllegalArgumentException e) {
-    }
-    conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
-  }
+
   @Test
   public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
     long maxHeapSizePerRequest = Long.MAX_VALUE;
@@ -601,10 +587,13 @@ public class TestAsyncProcess {
 
   private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
     ClusterConnection conn = createHConnection();
-    final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
-      AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
-    conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
-    BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
+      SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+      SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
 
     // sn has two regions
     long putSizeSN = 0;
@@ -630,11 +619,12 @@ public class TestAsyncProcess {
       + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
       + ", minCountSnRequest:" + minCountSnRequest
       + ", minCountSn2Request:" + minCountSn2Request);
-    try (HTable ht = new HTable(conn, bufferParam)) {
-      MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
-      ht.mutator.ap = ap;
 
-      Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    try (HTable ht = new HTable(conn, mutator)) {
+      Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
       ht.put(puts);
       List<AsyncRequestFuture> reqs = ap.allReqs;
 
@@ -680,12 +670,17 @@ public class TestAsyncProcess {
       assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
     }
     // restore config.
-    conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest);
+    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest);
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
+
   @Test
   public void testSubmit() throws Exception {
     ClusterConnection hc = createHConnection();
-    AsyncProcess ap = new MyAsyncProcess(hc, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -704,7 +699,7 @@ public class TestAsyncProcess {
         updateCalled.incrementAndGet();
       }
     };
-    AsyncProcess ap = new MyAsyncProcess(hc, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -717,13 +712,16 @@ public class TestAsyncProcess {
 
   @Test
   public void testSubmitBusyRegion() throws Exception {
-    ClusterConnection hc = createHConnection();
-    AsyncProcess ap = new MyAsyncProcess(hc, conf);
-
+    ClusterConnection conn = createHConnection();
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
+    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
+    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
       ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
     }
     ap.submit(null, DUMMY_TABLE, puts, false, null, false);
@@ -732,15 +730,22 @@ public class TestAsyncProcess {
     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
     ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(0, puts.size());
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
 
 
   @Test
   public void testSubmitBusyRegionServer() throws Exception {
-    ClusterConnection hc = createHConnection();
-    AsyncProcess ap = new MyAsyncProcess(hc, conf);
-
-    ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+    controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer));
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -751,14 +756,18 @@ public class TestAsyncProcess {
     ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(" puts=" + puts, 1, puts.size());
 
-    ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
+    controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
     ap.submit(null, DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
 
   @Test
   public void testFail() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
 
     List<Put> puts = new ArrayList<Put>();
     Put p = createPut(1, false);
@@ -784,10 +793,15 @@ public class TestAsyncProcess {
 
   @Test
   public void testSubmitTrue() throws IOException {
-    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
-    ap.tasksInProgress.incrementAndGet();
-    final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion);
-    ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
+    ClusterConnection conn = createHConnection();
+    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+    controller.tasksInProgress.incrementAndGet();
+    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
+    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
 
     final AtomicBoolean checkPoint = new AtomicBoolean(false);
     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
@@ -798,7 +812,7 @@ public class TestAsyncProcess {
         Threads.sleep(1000);
         Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
         ai.decrementAndGet();
-        ap.tasksInProgress.decrementAndGet();
+        controller.tasksInProgress.decrementAndGet();
         checkPoint2.set(true);
       }
     };
@@ -819,11 +833,15 @@ public class TestAsyncProcess {
     while (!checkPoint2.get()){
       Threads.sleep(1);
     }
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
 
   @Test
   public void testFailAndSuccess() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, false));
@@ -850,7 +868,7 @@ public class TestAsyncProcess {
 
   @Test
   public void testFlush() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, false));
@@ -868,24 +886,32 @@ public class TestAsyncProcess {
   @Test
   public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
     ClusterConnection hc = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false);
+    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
     testTaskCount(ap);
   }
 
   @Test
   public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
-    Configuration copyConf = new Configuration(conf);
+    Configuration copyConf = new Configuration(CONF);
     copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
     MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
-    ClusterConnection hc = createHConnection();
-    Mockito.when(hc.getConfiguration()).thenReturn(copyConf);
-    Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
-    Mockito.when(hc.getBackoffPolicy()).thenReturn(bp);
-    MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false);
+    ClusterConnection conn = createHConnection();
+    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
+    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
+    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false);
     testTaskCount(ap);
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
 
-  private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException {
+  private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException {
+    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
     List<Put> puts = new ArrayList<>();
     for (int i = 0; i != 3; ++i) {
       puts.add(createPut(1, true));
@@ -896,18 +922,24 @@ public class TestAsyncProcess {
     ap.waitForMaximumCurrentTasks(0, null);
     // More time to wait if there are incorrect task count.
     TimeUnit.SECONDS.sleep(1);
-    assertEquals(0, ap.tasksInProgress.get());
-    for (AtomicInteger count : ap.taskCounterPerRegion.values()) {
+    assertEquals(0, controller.tasksInProgress.get());
+    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
       assertEquals(0, count.get());
     }
-    for (AtomicInteger count : ap.taskCounterPerServer.values()) {
+    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
       assertEquals(0, count.get());
     }
   }
 
   @Test
   public void testMaxTask() throws Exception {
-    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+    ClusterConnection conn = createHConnection();
+    final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+      SimpleRequestController.class.getName());
+    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+
 
     for (int i = 0; i < 1000; i++) {
       ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
@@ -940,7 +972,7 @@ public class TestAsyncProcess {
       @Override
       public void run() {
         Threads.sleep(sleepTime);
-        while (ap.tasksInProgress.get() > 0) {
+        while (controller.tasksInProgress.get() > 0) {
           ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
         }
       }
@@ -953,6 +985,10 @@ public class TestAsyncProcess {
 
     //Adds 100 to secure us against approximate timing.
     Assert.assertTrue(start + 100L + sleepTime > end);
+    if (defaultClazz != null) {
+      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+        defaultClazz);
+    }
   }
 
   private static ClusterConnection createHConnection() throws IOException {
@@ -999,38 +1035,53 @@ public class TestAsyncProcess {
     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
     Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
     Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
-    Mockito.when(hc.getConfiguration()).thenReturn(conf);
+    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
+    Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
     return hc;
   }
 
   @Test
   public void testHTablePutSuccess() throws Exception {
-    BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
-    ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
 
     Put put = createPut(1, true);
 
-    Assert.assertEquals(0, ht.getWriteBufferSize());
+    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), ht.getWriteBufferSize());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
     ht.mutate(put);
-    Assert.assertEquals(0, ht.getWriteBufferSize());
+    ht.flush();
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+  }
+
+  @Test
+  public void testBufferedMutatorImplWithSharedPool() throws Exception {
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
+
+    ht.close();
+    assertFalse(ap.service.isShutdown());
   }
 
   private void doHTableFailedPut(boolean bufferOn) throws Exception {
     ClusterConnection conn = createHConnection();
-    BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     if (bufferOn) {
       bufferParam.writeBufferSize(1024L * 1024L);
     } else {
       bufferParam.writeBufferSize(0L);
     }
-
-    HTable ht = new HTable(conn, bufferParam);
-    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
-    ht.mutator.ap = ap;
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    HTable ht = new HTable(conn, mutator);
 
     Put put = createPut(1, false);
 
-    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+    Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
     try {
       ht.put(put);
       if (bufferOn) {
@@ -1039,7 +1090,7 @@ public class TestAsyncProcess {
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
-    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+    Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
     // The table should have sent one request, maybe after multiple attempts
     AsyncRequestFuture ars = null;
     for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -1067,10 +1118,10 @@ public class TestAsyncProcess {
   @Test
   public void testHTableFailedPutAndNewPut() throws Exception {
     ClusterConnection conn = createHConnection();
-    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
-        new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
-    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
-    mutator.ap = ap;
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
+            .writeBufferSize(0);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
 
     Put p = createPut(1, false);
     mutator.mutate(p);
@@ -1083,202 +1134,13 @@ public class TestAsyncProcess {
     //  puts, we may raise an exception in the middle of the list. It's then up to the caller to
     //  manage what was inserted, what was tried but failed, and what was not even tried.
     p = createPut(1, true);
-    Assert.assertEquals(0, mutator.writeAsyncBuffer.size());
+    Assert.assertEquals(0, mutator.size());
     try {
       mutator.mutate(p);
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
-    Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size());
-  }
-
-  @Test
-  public void testTaskCheckerHost() throws IOException {
-    final int maxTotalConcurrentTasks = 100;
-    final int maxConcurrentTasksPerServer = 2;
-    final int maxConcurrentTasksPerRegion = 1;
-    final AtomicLong tasksInProgress = new AtomicLong(0);
-    final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
-    final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
-    TaskCountChecker countChecker = new TaskCountChecker(
-      maxTotalConcurrentTasks,
-      maxConcurrentTasksPerServer,
-      maxConcurrentTasksPerRegion,
-      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
-    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
-    // unlimiited
-    RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest);
-    RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker));
-
-    ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
-    assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code);
-
-    ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
-    // rejected for size
-    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2);
-
-    ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest);
-    // rejected for size
-    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code);
-
-    // fill the task slots for loc3.
-    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
-    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
-
-    ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L);
-    // rejected for count
-    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code);
-
-    // release the task slots for loc3.
-    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
-    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
-
-    ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L);
-    assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2);
-  }
-
-  @Test
-  public void testRequestSizeCheckerr() throws IOException {
-    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
-    final ClusterConnection conn = createHConnection();
-    RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest);
-
-    // inner state is unchanged.
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-    }
-
-    // accept the data located on loc1 region.
-    ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
-    assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode);
-    checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest);
-
-    // the sn server reachs the limit.
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
-      assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
-      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
-      assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
-    }
-
-    // the request to sn2 server should be accepted.
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-    }
-
-    checker.reset();
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-    }
-  }
-
-  @Test
-  public void testSubmittedSizeChecker() {
-    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
-    SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit);
-
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode include = checker.canTakeOperation(loc1, 100000);
-      assertEquals(ReturnCode.INCLUDE, include);
-    }
-
-    for (int i = 0; i != 10; ++i) {
-      checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit);
-    }
-
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode include = checker.canTakeOperation(loc1, 100000);
-      assertEquals(ReturnCode.END, include);
-    }
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode include = checker.canTakeOperation(loc2, 100000);
-      assertEquals(ReturnCode.END, include);
-    }
-    checker.reset();
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode include = checker.canTakeOperation(loc1, 100000);
-      assertEquals(ReturnCode.INCLUDE, include);
-    }
-  }
-  @Test
-  public void testTaskCountChecker() throws InterruptedIOException {
-    long rowSize = 12345;
-    int maxTotalConcurrentTasks = 100;
-    int maxConcurrentTasksPerServer = 2;
-    int maxConcurrentTasksPerRegion = 1;
-    AtomicLong tasksInProgress = new AtomicLong(0);
-    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
-    Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
-    TaskCountChecker checker = new TaskCountChecker(
-      maxTotalConcurrentTasks,
-      maxConcurrentTasksPerServer,
-      maxConcurrentTasksPerRegion,
-      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
-
-    // inner state is unchanged.
-    for (int i = 0; i != 10; ++i) {
-      ReturnCode code = checker.canTakeOperation(loc1, rowSize);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-    }
-    // add loc1 region.
-    ReturnCode code = checker.canTakeOperation(loc1, rowSize);
-    assertEquals(RowChecker.ReturnCode.INCLUDE, code);
-    checker.notifyFinal(code, loc1, rowSize);
-
-    // fill the task slots for loc1.
-    taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100));
-    taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100));
-
-    // the region was previously accepted, so it must be accpted now.
-    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
-      ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
-      checker.notifyFinal(includeCode, loc1, rowSize);
-    }
-
-    // fill the task slots for loc3.
-    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
-    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
-
-    // no task slots.
-    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
-      ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize);
-      assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode);
-      checker.notifyFinal(excludeCode, loc3, rowSize);
-    }
-
-    // release the tasks for loc3.
-    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
-    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
-
-    // add loc3 region.
-    ReturnCode code3 = checker.canTakeOperation(loc3, rowSize);
-    assertEquals(RowChecker.ReturnCode.INCLUDE, code3);
-    checker.notifyFinal(code3, loc3, rowSize);
-
-    // the region was previously accepted, so it must be accpted now.
-    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
-      ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize);
-      assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
-      checker.notifyFinal(includeCode, loc3, rowSize);
-    }
-
-    checker.reset();
-    // the region was previously accepted,
-    // but checker have reseted and task slots for loc1 is full.
-    // So it must be rejected now.
-    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
-      ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
-      assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
-      checker.notifyFinal(includeCode, loc1, rowSize);
-    }
+    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
   }
 
   @Test
@@ -1302,9 +1164,12 @@ public class TestAsyncProcess {
 
   @Test
   public void testBatch() throws IOException, InterruptedException {
-    ClusterConnection conn = new MyConnectionImpl(conf);
-    HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
-    ht.multiAp = new MyAsyncProcess(conn, conf, false);
+    ClusterConnection conn = new MyConnectionImpl(CONF);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    HTable ht = new HTable(conn, mutator);
+    ht.multiAp = new MyAsyncProcess(conn, CONF, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -1332,18 +1197,16 @@ public class TestAsyncProcess {
   }
   @Test
   public void testErrorsServers() throws IOException {
-    Configuration configuration = new Configuration(conf);
+    Configuration configuration = new Configuration(CONF);
     ClusterConnection conn = new MyConnectionImpl(configuration);
-    BufferedMutatorImpl mutator =
-        new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
-    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
-
     MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
-    mutator.ap = ap;
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
 
-    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
-    Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
-    mutator.ap.serverTrackerTimeout = 1;
+    Assert.assertNotNull(ap.createServerErrorTracker());
+    Assert.assertTrue(ap.serverTrackerTimeout > 200);
+    ap.serverTrackerTimeout = 1;
 
     Put p = createPut(1, false);
     mutator.mutate(p);
@@ -1361,14 +1224,15 @@ public class TestAsyncProcess {
   public void testReadAndWriteTimeout() throws IOException {
     final long readTimeout = 10 * 1000;
     final long writeTimeout = 20 * 1000;
-    Configuration copyConf = new Configuration(conf);
+    Configuration copyConf = new Configuration(CONF);
     copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
     copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
     ClusterConnection conn = createHConnection();
     Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
-    BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
-    try (HTable ht = new HTable(conn, bufferParam)) {
-      MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    try (HTable ht = new HTable(conn, mutator)) {
       ht.multiAp = ap;
       List<Get> gets = new LinkedList<>();
       gets.add(new Get(DUMMY_BYTES_1));
@@ -1399,12 +1263,12 @@ public class TestAsyncProcess {
 
   @Test
   public void testGlobalErrors() throws IOException {
-    ClusterConnection conn = new MyConnectionImpl(conf);
-    BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
-    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
-    mutator.ap = ap;
+    ClusterConnection conn = new MyConnectionImpl(CONF);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
 
-    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+    Assert.assertNotNull(ap.createServerErrorTracker());
 
     Put p = createPut(1, true);
     mutator.mutate(p);
@@ -1421,13 +1285,11 @@ public class TestAsyncProcess {
 
   @Test
   public void testCallQueueTooLarge() throws IOException {
-    ClusterConnection conn = new MyConnectionImpl(conf);
-    BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
-    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
-    mutator.ap = ap;
-
-    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
-
+    ClusterConnection conn = new MyConnectionImpl(CONF);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    Assert.assertNotNull(ap.createServerErrorTracker());
     Put p = createPut(1, true);
     mutator.mutate(p);
 
@@ -1459,10 +1321,11 @@ public class TestAsyncProcess {
     }
 
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
-    HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
-    MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
+    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam, ap);
+    HTable ht = new HTable(con, mutator);
     ht.multiAp = ap;
-
     ht.batch(gets, null);
 
     Assert.assertEquals(ap.nbActions.get(), NB_REGS);
@@ -1482,7 +1345,16 @@ public class TestAsyncProcess {
     // One region has no replica, so the main call succeeds for it.
     MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
-    AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[3])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
     Assert.assertEquals(2, ap.getReplicaCallCount());
   }
@@ -1492,7 +1364,16 @@ public class TestAsyncProcess {
     // Main call succeeds before replica calls are kicked off.
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
-    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[3])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
     Assert.assertEquals(0, ap.getReplicaCallCount());
   }
@@ -1502,7 +1383,16 @@ public class TestAsyncProcess {
     // Either main or replica can succeed.
     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[2])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
     long replicaCalls = ap.getReplicaCallCount();
     Assert.assertTrue(replicaCalls >= 0);
@@ -1517,7 +1407,16 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
     ap.setPrimaryCallDelay(sn2, 2000);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[2])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
     Assert.assertEquals(1, ap.getReplicaCallCount());
   }
@@ -1530,7 +1429,16 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
     ap.addFailures(hri1, hri2);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[2])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
     Assert.assertEquals(0, ap.getReplicaCallCount());
   }
@@ -1542,7 +1450,16 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
     ap.addFailures(hri1, hri1r2, hri2);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[2])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
     Assert.assertEquals(2, ap.getReplicaCallCount());
   }
@@ -1554,7 +1471,16 @@ public class TestAsyncProcess {
     MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
     ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
-    AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(ap.service)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(rows)
+            .setResults(new Object[2])
+            .setSubmittedRows(SubmittedRows.ALL)
+            .build();
+    AsyncRequestFuture ars = ap.submit(task);
     verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
     // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
     Assert.assertEquals(3, ars.getErrors().getNumExceptions());
@@ -1583,6 +1509,13 @@ public class TestAsyncProcess {
     return ap;
   }
 
+  private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
+    return new BufferedMutatorParams(name)
+            .pool(ap.service)
+            .rpcTimeout(RPC_TIMEOUT)
+            .opertationTimeout(OPERATION_TIMEOUT);
+  }
+
   private static List<Get> makeTimelineGets(byte[]... rows) {
     List<Get> result = new ArrayList<Get>();
     for (byte[] row : rows) {
@@ -1663,14 +1596,9 @@ public class TestAsyncProcess {
   }
 
   static class AsyncProcessForThrowableCheck extends AsyncProcess {
-    private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-    private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
-        ExecutorService pool) {
-      super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
-          conf), rpcTimeout, operationTimeout);
+    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
+      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
+          conf));
     }
   }
 
@@ -1681,56 +1609,22 @@ public class TestAsyncProcess {
     MyThreadPoolExecutor myPool =
         new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(200));
-    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool);
+    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
-
-    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
+    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+            .setPool(myPool)
+            .setRpcTimeout(RPC_TIMEOUT)
+            .setOperationTimeout(OPERATION_TIMEOUT)
+            .setTableName(DUMMY_TABLE)
+            .setRowAccess(puts)
+            .setSubmittedRows(SubmittedRows.NORMAL)
+            .build();
+    ap.submit(task);
     Assert.assertTrue(puts.isEmpty());
   }
 
-  @Test
-  public void testWaitForMaximumCurrentTasks() throws Exception {
-    final AtomicLong tasks = new AtomicLong(0);
-    final AtomicInteger max = new AtomicInteger(0);
-    final CyclicBarrier barrier = new CyclicBarrier(2);
-    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
-    Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          barrier.await();
-          ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
-        } catch (InterruptedIOException e) {
-          Assert.fail(e.getMessage());
-        } catch (InterruptedException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        } catch (BrokenBarrierException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-      }
-    };
-    // First test that our runnable thread only exits when tasks is zero.
-    Thread t = new Thread(runnable);
-    t.start();
-    barrier.await();
-    t.join();
-    // Now assert we stay running if max == zero and tasks is > 0.
-    barrier.reset();
-    tasks.set(1000000);
-    t = new Thread(runnable);
-    t.start();
-    barrier.await();
-    while (tasks.get() > 0) {
-      assertTrue(t.isAlive());
-      tasks.set(tasks.get() - 1);
-    }
-    t.join();
-  }
-
   /**
    * Test and make sure we could use a special pause setting when retry with
    * CallQueueTooBigException, see HBASE-17114
@@ -1738,18 +1632,18 @@ public class TestAsyncProcess {
    */
   @Test
   public void testRetryPauseWithCallQueueTooBigException() throws Exception {
-    Configuration myConf = new Configuration(conf);
+    Configuration myConf = new Configuration(CONF);
     final long specialPause = 500L;
     final int retries = 1;
     myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
     myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
     ClusterConnection conn = new MyConnectionImpl(myConf);
-    BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
     AsyncProcessWithFailure ap =
         new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
-    mutator.ap = ap;
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
 
-    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
 
     Put p = createPut(1, true);
     mutator.mutate(p);
@@ -1775,8 +1669,9 @@ public class TestAsyncProcess {
     final long normalPause =
         myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
-    mutator.ap = ap;
-    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
     mutator.mutate(p);
     startTime = System.currentTimeMillis();
     try {