You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/25 12:38:02 UTC

[1/2] hbase git commit: HBASE-17345 Implement batch

Repository: hbase
Updated Branches:
  refs/heads/master 8da7366fc -> 8fa5b0b94


http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
deleted file mode 100644
index 612e830..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableMultiGet {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("async");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static byte[] CQ = Bytes.toBytes("cq");
-
-  private static int COUNT = 100;
-
-  private static AsyncConnection ASYNC_CONN;
-
-  @Parameter
-  public Supplier<AsyncTableBase> getTable;
-
-  private static RawAsyncTable getRawTable() {
-    return ASYNC_CONN.getRawTable(TABLE_NAME);
-  }
-
-  private static AsyncTable getTable() {
-    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
-  }
-
-  @Parameters
-  public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableMultiGet::getRawTable },
-      new Supplier<?>[] { TestAsyncTableMultiGet::getTable });
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
-    byte[][] splitKeys = new byte[8][];
-    for (int i = 11; i < 99; i += 11) {
-      splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
-    }
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
-    TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
-    List<CompletableFuture<?>> futures = new ArrayList<>();
-    IntStream.range(0, COUNT).forEach(i -> futures.add(table.put(
-      new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))));
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    ASYNC_CONN.close();
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  private void move() throws IOException, InterruptedException {
-    HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
-    HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get();
-    Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get();
-    TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(),
-      Bytes.toBytes(dst.getServerName().getServerName()));
-    Thread.sleep(1000);
-  }
-
-  private void test(BiFunction<AsyncTableBase, List<Get>, List<Result>> getFunc)
-      throws IOException, InterruptedException {
-    AsyncTableBase table = getTable.get();
-    List<Get> gets =
-        IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i))))
-            .collect(Collectors.toList());
-    List<Result> results = getFunc.apply(table, gets);
-    assertEquals(COUNT, results.size());
-    for (int i = 0; i < COUNT; i++) {
-      Result result = results.get(i);
-      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
-    }
-    // test basic failure recovery
-    move();
-    results = getFunc.apply(table, gets);
-    assertEquals(COUNT, results.size());
-    for (int i = 0; i < COUNT; i++) {
-      Result result = results.get(i);
-      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
-    }
-  }
-
-  @Test
-  public void testGet() throws InterruptedException, IOException {
-    test((table, gets) -> {
-      return table.get(gets).stream().map(f -> {
-        try {
-          return f.get();
-        } catch (InterruptedException | ExecutionException e) {
-          throw new RuntimeException(e);
-        }
-      }).collect(Collectors.toList());
-    });
-
-  }
-
-  @Test
-  public void testGetAll() throws InterruptedException, IOException {
-    test((table, gets) -> {
-      try {
-        return table.getAll(gets).get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
-}


[2/2] hbase git commit: HBASE-17345 Implement batch

Posted by zh...@apache.org.
HBASE-17345 Implement batch


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8fa5b0b9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8fa5b0b9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8fa5b0b9

Branch: refs/heads/master
Commit: 8fa5b0b946c01516076fa944a310b33224ff21a9
Parents: 8da7366
Author: zhangduo <zh...@apache.org>
Authored: Thu Dec 22 19:42:15 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Dec 25 20:36:52 2016 +0800

----------------------------------------------------------------------
 .../client/AsyncBatchRpcRetryingCaller.java     | 476 +++++++++++++++++++
 .../client/AsyncMultiGetRpcRetryingCaller.java  | 407 ----------------
 .../client/AsyncRpcRetryingCallerFactory.java   |  39 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java |   2 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |   2 +-
 .../hadoop/hbase/client/AsyncTableBase.java     | 103 +++-
 .../hadoop/hbase/client/AsyncTableImpl.java     |   4 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  62 ++-
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |   7 +-
 .../hbase/shaded/protobuf/RequestConverter.java |   5 +-
 .../client/AbstractTestAsyncTableScan.java      |  12 +-
 .../hbase/client/TestAsyncGetMultiThread.java   | 150 ------
 .../hbase/client/TestAsyncTableBatch.java       | 236 +++++++++
 .../client/TestAsyncTableGetMultiThreaded.java  | 149 ++++++
 .../hbase/client/TestAsyncTableMultiGet.java    | 163 -------
 15 files changed, 1032 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
new file mode 100644
index 0000000..6f0b8e9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Retry caller for batch.
+ * <p>
+ * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
+ * other single operations
+ * <p>
+ * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
+ * implementation, we will record a {@code tries} parameter for each operation group, and if it is
+ * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
+ * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
+ * the depth of the tree.
+ */
+@InterfaceAudience.Private
+class AsyncBatchRpcRetryingCaller<T> {
+
+  private static final Log LOG = LogFactory.getLog(AsyncBatchRpcRetryingCaller.class);
+
+  private final HashedWheelTimer retryTimer;
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final List<Action> actions;
+
+  private final List<CompletableFuture<T>> futures;
+
+  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
+
+  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final long operationTimeoutNs;
+
+  private final long readRpcTimeoutNs;
+
+  private final long writeRpcTimeoutNs;
+
+  private final int startLogErrorsCnt;
+
+  private final long startNs;
+
+  // we can not use HRegionLocation as the map key because the hashCode and equals method of
+  // HRegionLocation only consider serverName.
+  private static final class RegionRequest {
+
+    public final HRegionLocation loc;
+
+    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
+
+    public RegionRequest(HRegionLocation loc) {
+      this.loc = loc;
+    }
+  }
+
+  private static final class ServerRequest {
+
+    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
+        new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+
+    public final AtomicLong rpcTimeoutNs;
+
+    public ServerRequest(long defaultRpcTimeoutNs) {
+      this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs);
+    }
+
+    public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) {
+      computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+        () -> new RegionRequest(loc)).actions.add(action);
+      // try update the timeout to a larger value
+      if (this.rpcTimeoutNs.get() <= 0) {
+        return;
+      }
+      if (rpcTimeoutNs <= 0) {
+        this.rpcTimeoutNs.set(-1L);
+        return;
+      }
+      AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs);
+    }
+  }
+
+  public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+      TableName tableName, List<? extends Row> actions, long pauseNs, int maxRetries,
+      long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs,
+      int startLogErrorsCnt) {
+    this.retryTimer = retryTimer;
+    this.conn = conn;
+    this.tableName = tableName;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = retries2Attempts(maxRetries);
+    this.operationTimeoutNs = operationTimeoutNs;
+    this.readRpcTimeoutNs = readRpcTimeoutNs;
+    this.writeRpcTimeoutNs = writeRpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
+
+    this.actions = new ArrayList<>(actions.size());
+    this.futures = new ArrayList<>(actions.size());
+    this.action2Future = new IdentityHashMap<>(actions.size());
+    for (int i = 0, n = actions.size(); i < n; i++) {
+      Row rawAction = actions.get(i);
+      Action action = new Action(rawAction, i);
+      if (rawAction instanceof Append || rawAction instanceof Increment) {
+        action.setNonce(conn.getNonceGenerator().newNonce());
+      }
+      this.actions.add(action);
+      CompletableFuture<T> future = new CompletableFuture<>();
+      futures.add(future);
+      action2Future.put(action, future);
+    }
+    this.action2Errors = new IdentityHashMap<>();
+    this.startNs = System.nanoTime();
+  }
+
+  private long remainingTimeNs() {
+    return operationTimeoutNs - (System.nanoTime() - startNs);
+  }
+
+  private List<ThrowableWithExtraContext> removeErrors(Action action) {
+    synchronized (action2Errors) {
+      return action2Errors.remove(action);
+    }
+  }
+
+  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
+      Throwable error, ServerName serverName) {
+    if (tries > startLogErrorsCnt) {
+      String regions =
+          regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
+              .collect(Collectors.joining(",", "[", "]"));
+      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
+          + " failed, tries=" + tries,
+        error);
+    }
+  }
+
+  private String getExtraContextForError(ServerName serverName) {
+    return serverName != null ? serverName.getServerName() : "";
+  }
+
+  private void addError(Action action, Throwable error, ServerName serverName) {
+    List<ThrowableWithExtraContext> errors;
+    synchronized (action2Errors) {
+      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
+    }
+    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
+        getExtraContextForError(serverName)));
+  }
+
+  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
+    actions.forEach(action -> addError(action, error, serverName));
+  }
+
+  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
+    CompletableFuture<T> future = action2Future.get(action);
+    if (future.isDone()) {
+      return;
+    }
+    ThrowableWithExtraContext errorWithCtx =
+        new ThrowableWithExtraContext(error, currentTime, extras);
+    List<ThrowableWithExtraContext> errors = removeErrors(action);
+    if (errors == null) {
+      errors = Collections.singletonList(errorWithCtx);
+    } else {
+      errors.add(errorWithCtx);
+    }
+    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
+  }
+
+  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    String extras = getExtraContextForError(serverName);
+    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
+  }
+
+  private void failAll(Stream<Action> actions, int tries) {
+    actions.forEach(action -> {
+      CompletableFuture<T> future = action2Future.get(action);
+      if (future.isDone()) {
+        return;
+      }
+      future.completeExceptionally(new RetriesExhaustedException(tries,
+          Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+    });
+  }
+
+  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
+      List<CellScannable> cells) throws IOException {
+    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
+    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
+    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
+    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
+      // TODO: remove the extra for loop as we will iterate it in mutationBuilder.
+      if (!multiRequestBuilder.hasNonceGroup()) {
+        for (Action action : entry.getValue().actions) {
+          if (action.hasNonce()) {
+            multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
+            break;
+          }
+        }
+      }
+      regionActionBuilder.clear();
+      regionActionBuilder.setRegion(
+        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
+      regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
+        entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
+      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+    }
+    return multiRequestBuilder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
+      RegionResult regionResult, List<Action> failedActions) {
+    Object result = regionResult.result.get(action.getOriginalIndex());
+    if (result == null) {
+      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
+          + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
+          + regionReq.loc.getRegionInfo().getRegionNameAsString());
+      addError(action, new RuntimeException("Invalid response"), serverName);
+      failedActions.add(action);
+    } else if (result instanceof Throwable) {
+      Throwable error = translateException((Throwable) result);
+      logException(tries, () -> Stream.of(regionReq), error, serverName);
+      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
+          getExtraContextForError(serverName));
+      } else {
+        failedActions.add(action);
+      }
+    } else {
+      action2Future.get(action).complete((T) result);
+    }
+  }
+
+  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
+      ServerName serverName, MultiResponse resp) {
+    List<Action> failedActions = new ArrayList<>();
+    actionsByRegion.forEach((rn, regionReq) -> {
+      RegionResult regionResult = resp.getResults().get(rn);
+      if (regionResult != null) {
+        regionReq.actions.forEach(
+          action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
+      } else {
+        Throwable t = resp.getException(rn);
+        Throwable error;
+        if (t == null) {
+          LOG.error(
+            "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+          error = new RuntimeException("Invalid response");
+        } else {
+          error = translateException(t);
+          logException(tries, () -> Stream.of(regionReq), error, serverName);
+          conn.getLocator().updateCachedLocation(regionReq.loc, error);
+          if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+            failAll(regionReq.actions.stream(), tries, error, serverName);
+            return;
+          }
+          addError(regionReq.actions, error, serverName);
+          failedActions.addAll(regionReq.actions);
+        }
+      }
+    });
+    if (!failedActions.isEmpty()) {
+      tryResubmit(failedActions.stream(), tries);
+    }
+  }
+
+  private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
+    long remainingNs;
+    if (operationTimeoutNs > 0) {
+      remainingNs = remainingTimeNs();
+      if (remainingNs <= 0) {
+        failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
+            .flatMap(r -> r.actions.stream()),
+          tries);
+        return;
+      }
+    } else {
+      remainingNs = Long.MAX_VALUE;
+    }
+    actionsByServer.forEach((sn, serverReq) -> {
+      ClientService.Interface stub;
+      try {
+        stub = conn.getRegionServerStub(sn);
+      } catch (IOException e) {
+        onError(serverReq.actionsByRegion, tries, e, sn);
+        return;
+      }
+      ClientProtos.MultiRequest req;
+      List<CellScannable> cells = new ArrayList<>();
+      try {
+        req = buildReq(serverReq.actionsByRegion, cells);
+      } catch (IOException e) {
+        onError(serverReq.actionsByRegion, tries, e, sn);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs));
+      if (!cells.isEmpty()) {
+        controller.setCellScanner(createCellScanner(cells));
+      }
+      stub.multi(controller, req, resp -> {
+        if (controller.failed()) {
+          onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
+        } else {
+          try {
+            onComplete(serverReq.actionsByRegion, tries, sn,
+              ResponseConverter.getResults(req, resp, controller.cellScanner()));
+          } catch (Exception e) {
+            onError(serverReq.actionsByRegion, tries, e, sn);
+            return;
+          }
+        }
+      });
+    });
+  }
+
+  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
+      ServerName serverName) {
+    Throwable error = translateException(t);
+    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
+    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
+        serverName);
+      return;
+    }
+    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
+        .collect(Collectors.toList());
+    addError(copiedActions, error, serverName);
+    tryResubmit(copiedActions.stream(), tries);
+  }
+
+  private void tryResubmit(Stream<Action> actions, int tries) {
+    long delayNs;
+    if (operationTimeoutNs > 0) {
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+      if (maxDelayNs <= 0) {
+        failAll(actions, tries);
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  private long getRpcTimeoutNs(Action action) {
+    return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs;
+  }
+
+  private void groupAndSend(Stream<Action> actions, int tries) {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        failAll(actions, tries);
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
+    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
+    // use the small one as the default timeout value, and increase the timeout value if we have an
+    // action in the group needs a larger timeout value.
+    long defaultRpcTimeoutNs;
+    if (readRpcTimeoutNs > 0) {
+      defaultRpcTimeoutNs =
+          writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs;
+    } else {
+      defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L;
+    }
+    CompletableFuture.allOf(actions
+        .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+          RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+            if (error != null) {
+              error = translateException(error);
+              if (error instanceof DoNotRetryIOException) {
+                failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+                return;
+              }
+              addError(action, error, null);
+              locateFailed.add(action);
+            } else {
+              computeIfAbsent(actionsByServer, loc.getServerName(),
+                () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action,
+                  getRpcTimeoutNs(action));
+            }
+          }))
+        .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
+          if (!actionsByServer.isEmpty()) {
+            send(actionsByServer, tries);
+          }
+          if (!locateFailed.isEmpty()) {
+            tryResubmit(locateFailed.stream(), tries);
+          }
+        });
+  }
+
+  public List<CompletableFuture<T>> call() {
+    groupAndSend(actions.stream(), 1);
+    return futures;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
deleted file mode 100644
index e1208c2..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import io.netty.util.HashedWheelTimer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * Retry caller for multi get.
- * <p>
- * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
- * other single operations
- * <p>
- * And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the
- * implementation, we will record a {@code tries} parameter for each operation group, and if it is
- * split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine
- * that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth
- * of the tree.
- */
-@InterfaceAudience.Private
-class AsyncMultiGetRpcRetryingCaller {
-
-  private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class);
-
-  private final HashedWheelTimer retryTimer;
-
-  private final AsyncConnectionImpl conn;
-
-  private final TableName tableName;
-
-  private final List<Get> gets;
-
-  private final List<CompletableFuture<Result>> futures;
-
-  private final IdentityHashMap<Get, CompletableFuture<Result>> get2Future;
-
-  private final IdentityHashMap<Get, List<ThrowableWithExtraContext>> get2Errors;
-
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final long operationTimeoutNs;
-
-  private final long rpcTimeoutNs;
-
-  private final int startLogErrorsCnt;
-
-  private final long startNs;
-
-  // we can not use HRegionLocation as the map key because the hashCode and equals method of
-  // HRegionLocation only consider serverName.
-  private static final class RegionRequest {
-
-    public final HRegionLocation loc;
-
-    public final ConcurrentLinkedQueue<Get> gets = new ConcurrentLinkedQueue<>();
-
-    public RegionRequest(HRegionLocation loc) {
-      this.loc = loc;
-    }
-  }
-
-  public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, List<Get> gets, long pauseNs, int maxRetries, long operationTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
-    this.retryTimer = retryTimer;
-    this.conn = conn;
-    this.tableName = tableName;
-    this.gets = gets;
-    this.pauseNs = pauseNs;
-    this.maxAttempts = retries2Attempts(maxRetries);
-    this.operationTimeoutNs = operationTimeoutNs;
-    this.rpcTimeoutNs = rpcTimeoutNs;
-    this.startLogErrorsCnt = startLogErrorsCnt;
-
-    this.futures = new ArrayList<>(gets.size());
-    this.get2Future = new IdentityHashMap<>(gets.size());
-    gets.forEach(
-      get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>())));
-    this.get2Errors = new IdentityHashMap<>();
-    this.startNs = System.nanoTime();
-  }
-
-  private long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
-  }
-
-  private List<ThrowableWithExtraContext> removeErrors(Get get) {
-    synchronized (get2Errors) {
-      return get2Errors.remove(get);
-    }
-  }
-
-  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
-      Throwable error, ServerName serverName) {
-    if (tries > startLogErrorsCnt) {
-      String regions =
-          regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
-              .collect(Collectors.joining(",", "[", "]"));
-      LOG.warn("Get data for " + regions + " in " + tableName + " from " + serverName
-          + " failed, tries=" + tries,
-        error);
-    }
-  }
-
-  private String getExtras(ServerName serverName) {
-    return serverName != null ? serverName.getServerName() : "";
-  }
-
-  private void addError(Get get, Throwable error, ServerName serverName) {
-    List<ThrowableWithExtraContext> errors;
-    synchronized (get2Errors) {
-      errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>());
-    }
-    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
-        serverName != null ? serverName.toString() : ""));
-  }
-
-  private void addError(Iterable<Get> gets, Throwable error, ServerName serverName) {
-    gets.forEach(get -> addError(get, error, serverName));
-  }
-
-  private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) {
-    CompletableFuture<Result> future = get2Future.get(get);
-    if (future.isDone()) {
-      return;
-    }
-    ThrowableWithExtraContext errorWithCtx =
-        new ThrowableWithExtraContext(error, currentTime, extras);
-    List<ThrowableWithExtraContext> errors = removeErrors(get);
-    if (errors == null) {
-      errors = Collections.singletonList(errorWithCtx);
-    } else {
-      errors.add(errorWithCtx);
-    }
-    future.completeExceptionally(new RetriesExhaustedException(tries, errors));
-  }
-
-  private void failAll(Stream<Get> gets, int tries, Throwable error, ServerName serverName) {
-    long currentTime = System.currentTimeMillis();
-    String extras = getExtras(serverName);
-    gets.forEach(get -> failOne(get, tries, error, currentTime, extras));
-  }
-
-  private void failAll(Stream<Get> gets, int tries) {
-    gets.forEach(get -> {
-      CompletableFuture<Result> future = get2Future.get(get);
-      if (future.isDone()) {
-        return;
-      }
-      future.completeExceptionally(new RetriesExhaustedException(tries,
-          Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList())));
-    });
-  }
-
-  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> getsByRegion)
-      throws IOException {
-    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
-    for (Map.Entry<byte[], RegionRequest> entry : getsByRegion.entrySet()) {
-      ClientProtos.RegionAction.Builder regionActionBuilder =
-          ClientProtos.RegionAction.newBuilder().setRegion(
-            RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
-      int index = 0;
-      for (Get get : entry.getValue().gets) {
-        regionActionBuilder.addAction(
-          ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get)));
-        index++;
-      }
-      multiRequestBuilder.addRegionAction(regionActionBuilder);
-    }
-    return multiRequestBuilder.build();
-  }
-
-  private void onComplete(Map<byte[], RegionRequest> getsByRegion, int tries, ServerName serverName,
-      MultiResponse resp) {
-    List<Get> failedGets = new ArrayList<>();
-    getsByRegion.forEach((rn, regionReq) -> {
-      RegionResult regionResult = resp.getResults().get(rn);
-      if (regionResult != null) {
-        int index = 0;
-        for (Get get : regionReq.gets) {
-          Object result = regionResult.result.get(index);
-          if (result == null) {
-            LOG.error("Server sent us neither result nor exception for row '"
-                + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn));
-            addError(get, new RuntimeException("Invalid response"), serverName);
-            failedGets.add(get);
-          } else if (result instanceof Throwable) {
-            Throwable error = translateException((Throwable) result);
-            logException(tries, () -> Stream.of(regionReq), error, serverName);
-            if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
-              failOne(get, tries, error, EnvironmentEdgeManager.currentTime(),
-                getExtras(serverName));
-            } else {
-              failedGets.add(get);
-            }
-          } else {
-            get2Future.get(get).complete((Result) result);
-          }
-          index++;
-        }
-      } else {
-        Throwable t = resp.getException(rn);
-        Throwable error;
-        if (t == null) {
-          LOG.error(
-            "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
-          error = new RuntimeException("Invalid response");
-        } else {
-          error = translateException(t);
-          logException(tries, () -> Stream.of(regionReq), error, serverName);
-          conn.getLocator().updateCachedLocation(regionReq.loc, error);
-          if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
-            failAll(regionReq.gets.stream(), tries, error, serverName);
-            return;
-          }
-          addError(regionReq.gets, error, serverName);
-          failedGets.addAll(regionReq.gets);
-        }
-      }
-    });
-    if (!failedGets.isEmpty()) {
-      tryResubmit(failedGets.stream(), tries);
-    }
-  }
-
-  private void send(Map<ServerName, ? extends Map<byte[], RegionRequest>> getsByServer, int tries) {
-    long callTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      long remainingNs = remainingTimeNs();
-      if (remainingNs <= 0) {
-        failAll(getsByServer.values().stream().flatMap(m -> m.values().stream())
-            .flatMap(r -> r.gets.stream()),
-          tries);
-        return;
-      }
-      callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs);
-    } else {
-      callTimeoutNs = rpcTimeoutNs;
-    }
-    getsByServer.forEach((sn, getsByRegion) -> {
-      ClientService.Interface stub;
-      try {
-        stub = conn.getRegionServerStub(sn);
-      } catch (IOException e) {
-        onError(getsByRegion, tries, e, sn);
-        return;
-      }
-      ClientProtos.MultiRequest req;
-      try {
-        req = buildReq(getsByRegion);
-      } catch (IOException e) {
-        onError(getsByRegion, tries, e, sn);
-        return;
-      }
-      HBaseRpcController controller = conn.rpcControllerFactory.newController();
-      resetController(controller, callTimeoutNs);
-      stub.multi(controller, req, resp -> {
-        if (controller.failed()) {
-          onError(getsByRegion, tries, controller.getFailed(), sn);
-        } else {
-          try {
-            onComplete(getsByRegion, tries, sn,
-              ResponseConverter.getResults(req, resp, controller.cellScanner()));
-          } catch (Exception e) {
-            onError(getsByRegion, tries, e, sn);
-            return;
-          }
-        }
-      });
-    });
-  }
-
-  private void onError(Map<byte[], RegionRequest> getsByRegion, int tries, Throwable t,
-      ServerName serverName) {
-    Throwable error = translateException(t);
-    logException(tries, () -> getsByRegion.values().stream(), error, serverName);
-    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
-      failAll(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, error,
-        serverName);
-      return;
-    }
-    List<Get> copiedGets =
-        getsByRegion.values().stream().flatMap(r -> r.gets.stream()).collect(Collectors.toList());
-    addError(copiedGets, error, serverName);
-    tryResubmit(copiedGets.stream(), tries);
-  }
-
-  private void tryResubmit(Stream<Get> gets, int tries) {
-    long delayNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        failAll(gets, tries);
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
-    }
-    retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS);
-  }
-
-  private void groupAndSend(Stream<Get> gets, int tries) {
-    long locateTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
-      if (locateTimeoutNs <= 0) {
-        failAll(gets, tries);
-        return;
-      }
-    } else {
-      locateTimeoutNs = -1L;
-    }
-    ConcurrentMap<ServerName, ConcurrentMap<byte[], RegionRequest>> getsByServer =
-        new ConcurrentHashMap<>();
-    ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
-    CompletableFuture.allOf(gets.map(get -> conn.getLocator()
-        .getRegionLocation(tableName, get.getRow(), RegionLocateType.CURRENT, locateTimeoutNs)
-        .whenComplete((loc, error) -> {
-          if (error != null) {
-            error = translateException(error);
-            if (error instanceof DoNotRetryIOException) {
-              failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), "");
-              return;
-            }
-            addError(get, error, null);
-            locateFailed.add(get);
-          } else {
-            ConcurrentMap<byte[], RegionRequest> getsByRegion = computeIfAbsent(getsByServer,
-              loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
-            computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(),
-              () -> new RegionRequest(loc)).gets.add(get);
-          }
-        })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
-          if (!getsByServer.isEmpty()) {
-            send(getsByServer, tries);
-          }
-          if (!locateFailed.isEmpty()) {
-            tryResubmit(locateFailed.stream(), tries);
-          }
-        });
-  }
-
-  public List<CompletableFuture<Result>> call() {
-    groupAndSend(gets.stream(), 1);
-    return futures;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index d240fab..c90bee2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -258,48 +258,55 @@ class AsyncRpcRetryingCallerFactory {
     return new ScanSingleRegionCallerBuilder();
   }
 
-  public class MultiGetCallerBuilder {
+  public class BatchCallerBuilder {
 
     private TableName tableName;
 
-    private List<Get> gets;
+    private List<? extends Row> actions;
 
     private long operationTimeoutNs = -1L;
 
-    private long rpcTimeoutNs = -1L;
+    private long readRpcTimeoutNs = -1L;
+
+    private long writeRpcTimeoutNs = -1L;
 
-    public MultiGetCallerBuilder table(TableName tableName) {
+    public BatchCallerBuilder table(TableName tableName) {
       this.tableName = tableName;
       return this;
     }
 
-    public MultiGetCallerBuilder gets(List<Get> gets) {
-      this.gets = gets;
+    public BatchCallerBuilder actions(List<? extends Row> actions) {
+      this.actions = actions;
       return this;
     }
 
-    public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
+    public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
       this.operationTimeoutNs = unit.toNanos(operationTimeout);
       return this;
     }
 
-    public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
-      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+    public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.readRpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout);
       return this;
     }
 
-    public AsyncMultiGetRpcRetryingCaller build() {
-      return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets,
+    public <T> AsyncBatchRpcRetryingCaller<T> build() {
+      return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions,
           conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
-          rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+          readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
     }
 
-    public List<CompletableFuture<Result>> call() {
-      return build().call();
+    public <T> List<CompletableFuture<T>> call() {
+      return this.<T> build().call();
     }
   }
 
-  public MultiGetCallerBuilder multiGet() {
-    return new MultiGetCallerBuilder();
+  public BatchCallerBuilder batch() {
+    return new BatchCallerBuilder();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 81c806f..5bf6195 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -161,7 +161,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     if (closeScanner) {
       closeScanner();
     }
-    future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
+    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
   }
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 0b4add1..04e69af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -120,7 +120,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   }
 
   private void completeExceptionally() {
-    future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
+    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
   }
 
   private void onError(Throwable error, Supplier<String> errMsg,

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index a2b5247..19a22c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll;
+
 import com.google.common.base.Preconditions;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -30,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * The base interface for asynchronous version of Table. Obtain an instance from a
@@ -126,11 +129,7 @@ public interface AsyncTableBase {
    *         be wrapped by a {@link CompletableFuture}.
    */
   default CompletableFuture<Boolean> exists(Get get) {
-    if (!get.isCheckExistenceOnly()) {
-      get = ReflectionUtils.newInstance(get.getClass(), get);
-      get.setCheckExistenceOnly(true);
-    }
-    return get(get).thenApply(r -> r.getExists());
+    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
   }
 
   /**
@@ -362,7 +361,9 @@ public interface AsyncTableBase {
    * @param gets The objects that specify what data to fetch and from which rows.
    * @return A list of {@link CompletableFuture}s that represent the result for each get.
    */
-  List<CompletableFuture<Result>> get(List<Get> gets);
+  default List<CompletableFuture<Result>> get(List<Get> gets) {
+    return batch(gets);
+  }
 
   /**
    * A simple version for batch get. It will fail if there are any failures and you will get the
@@ -371,8 +372,90 @@ public interface AsyncTableBase {
    * @return A {@link CompletableFuture} that wrapper the result list.
    */
   default CompletableFuture<List<Result>> getAll(List<Get> gets) {
-    List<CompletableFuture<Result>> futures = get(gets);
+    return batchAll(gets);
+  }
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Gets.
+   * <p>
+   * This will return a list of booleans. Each value will be true if the related Get matches one or
+   * more keys, false if not.
+   * <p>
+   * This is a server-side call so it prevents any data from being transferred to the client.
+   * @param gets the Gets
+   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
+   */
+  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
+    return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists()))
+        .collect(toList());
+  }
+
+  /**
+   * A simple version for batch exists. It will fail if there are any failures and you will get the
+   * whole result boolean list at once if the operation is succeeded.
+   * @param gets the Gets
+   * @return A {@link CompletableFuture} that wrapper the result boolean list.
+   */
+  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
+    return getAll(toCheckExistenceOnly(gets))
+        .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList()));
+  }
+
+  /**
+   * Puts some data in the table, in batch.
+   * @param puts The list of mutations to apply.
+   * @return A list of {@link CompletableFuture}s that represent the result for each put.
+   */
+  default List<CompletableFuture<Void>> put(List<Put> puts) {
+    return voidBatch(this, puts);
+  }
+
+  /**
+   * A simple version of batch put. It will fail if there are any failures.
+   * @param puts The list of mutations to apply.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  default CompletableFuture<Void> putAll(List<Put> puts) {
+    return voidBatchAll(this, puts);
+  }
+
+  /**
+   * Deletes the specified cells/rows in bulk.
+   * @param deletes list of things to delete.
+   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
+   */
+  default List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+    return voidBatch(this, deletes);
+  }
+
+  /**
+   * A simple version of batch delete. It will fail if there are any failures.
+   * @param deletes list of things to delete.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
+    return voidBatchAll(this, deletes);
+  }
+
+  /**
+   * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
+   * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
+   * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
+   * had put.
+   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @return A list of {@link CompletableFuture}s that represent the result for each action.
+   */
+  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
+
+  /**
+   * A simple version of batch. It will fail if there are any failures and you will get the whole
+   * result list at once if the operation is succeeded.
+   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
+   */
+  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
+    List<CompletableFuture<T>> futures = batch(actions);
     return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList()));
+        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 6cc2551..7281185 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -194,7 +194,7 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public List<CompletableFuture<Result>> get(List<Get> gets) {
-    return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList());
+  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(Collectors.toList());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index cc27992..4355182 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 
@@ -28,6 +29,8 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -59,11 +63,11 @@ public final class ConnectionUtils {
 
   private static final Log LOG = LogFactory.getLog(ConnectionUtils.class);
 
-  private ConnectionUtils() {}
+  private ConnectionUtils() {
+  }
 
   /**
-   * Calculate pause time.
-   * Built on {@link HConstants#RETRY_BACKOFF}.
+   * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}.
    * @param pause time to pause
    * @param tries amount of tries
    * @return How long to wait after <code>tries</code> retries
@@ -83,7 +87,6 @@ public final class ConnectionUtils {
     return normalPause + jitter;
   }
 
-
   /**
    * Adds / subs an up to 50% jitter to a pause time. Minimum is 1.
    * @param pause the expected pause.
@@ -103,24 +106,23 @@ public final class ConnectionUtils {
    * @param cnm Replaces the nonce generator used, for testing.
    * @return old nonce generator.
    */
-  public static NonceGenerator injectNonceGeneratorForTesting(
-      ClusterConnection conn, NonceGenerator cnm) {
+  public static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn,
+      NonceGenerator cnm) {
     return ConnectionImplementation.injectNonceGeneratorForTesting(conn, cnm);
   }
 
   /**
-   * Changes the configuration to set the number of retries needed when using Connection
-   * internally, e.g. for  updating catalog tables, etc.
-   * Call this method before we create any Connections.
+   * Changes the configuration to set the number of retries needed when using Connection internally,
+   * e.g. for updating catalog tables, etc. Call this method before we create any Connections.
    * @param c The Configuration instance to set the retries into.
    * @param log Used to log what we set in here.
    */
-  public static void setServerSideHConnectionRetriesConfig(
-      final Configuration c, final String sn, final Log log) {
+  public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn,
+      final Log log) {
     // TODO: Fix this. Not all connections from server side should have 10 times the retries.
     int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    // Go big.  Multiply by 10.  If we can't get to meta after this many retries
+    // Go big. Multiply by 10. If we can't get to meta after this many retries
     // then something seriously wrong.
     int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
     int retries = hcRetries * serversideMultiplier;
@@ -141,9 +143,9 @@ public final class ConnectionUtils {
    * @throws IOException if IO failure occurred
    */
   public static ClusterConnection createShortCircuitConnection(final Configuration conf,
-    ExecutorService pool, User user, final ServerName serverName,
-    final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
-    throws IOException {
+      ExecutorService pool, User user, final ServerName serverName,
+      final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
+      throws IOException {
     if (user == null) {
       user = UserProvider.instantiate(conf).getCurrent();
     }
@@ -166,8 +168,7 @@ public final class ConnectionUtils {
    */
   @VisibleForTesting
   public static void setupMasterlessConnection(Configuration conf) {
-    conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
-      MasterlessConnection.class.getName());
+    conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MasterlessConnection.class.getName());
   }
 
   /**
@@ -175,8 +176,7 @@ public final class ConnectionUtils {
    * region re-lookups.
    */
   static class MasterlessConnection extends ConnectionImplementation {
-    MasterlessConnection(Configuration conf,
-      ExecutorService pool, User user) throws IOException {
+    MasterlessConnection(Configuration conf, ExecutorService pool, User user) throws IOException {
       super(conf, pool, user);
     }
 
@@ -197,8 +197,7 @@ public final class ConnectionUtils {
   /**
    * Get a unique key for the rpc stub to the given server.
    */
-  static String getStubKey(String serviceName, ServerName serverName,
-      boolean hostnameCanChange) {
+  static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) {
     // Sometimes, servers go down and they come back up with the same hostname but a different
     // IP address. Force a resolution of the rsHostname by trying to instantiate an
     // InetSocketAddress, and this way we will rightfully get a new stubKey.
@@ -327,4 +326,25 @@ public final class ConnectionUtils {
 
   // Add a delta to avoid timeout immediately after a retry sleeping.
   static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+
+  static Get toCheckExistenceOnly(Get get) {
+    if (get.isCheckExistenceOnly()) {
+      return get;
+    }
+    return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true);
+  }
+
+  static List<Get> toCheckExistenceOnly(List<Get> gets) {
+    return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList());
+  }
+
+  static List<CompletableFuture<Void>> voidBatch(AsyncTableBase table,
+      List<? extends Row> actions) {
+    return table.<Object> batch(actions).stream().map(f -> f.<Void> thenApply(r -> null))
+        .collect(toList());
+  }
+
+  static CompletableFuture<Void> voidBatchAll(AsyncTableBase table, List<? extends Row> actions) {
+    return table.<Object> batchAll(actions).thenApply(r -> null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 6fad0da..347c85b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -407,9 +407,10 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   @Override
-  public List<CompletableFuture<Result>> get(List<Get> gets) {
-    return conn.callerFactory.multiGet().table(tableName).gets(gets)
+  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+    return conn.callerFactory.batch().table(tableName).actions(actions)
         .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+        .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 446cd89..424d578 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -52,9 +52,8 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
@@ -670,7 +669,7 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
-      final List<Action> actions, final List<CellScannable> cells,
+      final Iterable<Action> actions, final List<CellScannable> cells,
       final RegionAction.Builder regionActionBuilder,
       final ClientProtos.Action.Builder actionBuilder,
       final MutationProto.Builder mutationBuilder) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index 3028111..5614d8e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -22,10 +22,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -62,12 +60,10 @@ public abstract class AbstractTestAsyncTableScan {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
-    List<CompletableFuture<?>> futures = new ArrayList<>();
-    IntStream.range(0, COUNT).forEach(
-      i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
-          .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+    ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
+        .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+            .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
+        .collect(Collectors.toList())).get();
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
deleted file mode 100644
index d24501d..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
-import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.IntStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Will split the table, and move region randomly when testing.
- */
-@Category({ LargeTests.class, ClientTests.class })
-public class TestAsyncGetMultiThread {
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("async");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
-  private static int COUNT = 1000;
-
-  private static AsyncConnection CONN;
-
-  private static byte[][] SPLIT_KEYS;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
-    TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
-    TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
-    TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
-    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
-    TEST_UTIL.startMiniCluster(5);
-    SPLIT_KEYS = new byte[8][];
-    for (int i = 111; i < 999; i += 111) {
-      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
-    }
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
-    TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
-    List<CompletableFuture<?>> futures = new ArrayList<>();
-    IntStream.range(0, COUNT)
-        .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
-            .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(CONN);
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
-    while (!stop.get()) {
-      int i = ThreadLocalRandom.current().nextInt(COUNT);
-      assertEquals(i,
-        Bytes.toInt(
-          CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
-              .getValue(FAMILY, QUALIFIER)));
-    }
-  }
-
-  @Test
-  public void test() throws IOException, InterruptedException, ExecutionException {
-    int numThreads = 20;
-    AtomicBoolean stop = new AtomicBoolean(false);
-    ExecutorService executor =
-        Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
-    List<Future<?>> futures = new ArrayList<>();
-    IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
-      run(stop);
-      return null;
-    })));
-    Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
-    Admin admin = TEST_UTIL.getAdmin();
-    for (byte[] splitPoint : SPLIT_KEYS) {
-      admin.split(TABLE_NAME, splitPoint);
-      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
-        region.compact(true);
-      }
-      Thread.sleep(5000);
-      admin.balancer(true);
-      Thread.sleep(5000);
-      ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
-      ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-          .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
-          .findAny().get();
-      admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
-        Bytes.toBytes(newMetaServer.getServerName()));
-      Thread.sleep(5000);
-    }
-    stop.set(true);
-    executor.shutdown();
-    for (Future<?> future : futures) {
-      future.get();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
new file mode 100644
index 0000000..308b9e5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableBatch {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static int COUNT = 1000;
+
+  private static AsyncConnection CONN;
+
+  private static byte[][] SPLIT_KEYS;
+
+  @Parameter(0)
+  public String tableType;
+
+  @Parameter(1)
+  public Function<TableName, AsyncTableBase> tableGetter;
+
+  private static RawAsyncTable getRawTable(TableName tableName) {
+    return CONN.getRawTable(tableName);
+  }
+
+  private static AsyncTable getTable(TableName tableName) {
+    return CONN.getTable(tableName, ForkJoinPool.commonPool());
+  }
+
+  @Parameters(name = "{index}: type={0}")
+  public static List<Object[]> params() {
+    Function<TableName, AsyncTableBase> rawTableGetter = TestAsyncTableBatch::getRawTable;
+    Function<TableName, AsyncTableBase> tableGetter = TestAsyncTableBatch::getTable;
+    return Arrays.asList(new Object[] { "raw", rawTableGetter },
+      new Object[] { "normal", tableGetter });
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    SPLIT_KEYS = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUpBeforeTest() throws IOException, InterruptedException {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @After
+  public void tearDownAfterTest() throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    if (admin.isTableEnabled(TABLE_NAME)) {
+      admin.disableTable(TABLE_NAME);
+    }
+    admin.deleteTable(TABLE_NAME);
+  }
+
+  private byte[] getRow(int i) {
+    return Bytes.toBytes(String.format("%03d", i));
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException, IOException {
+    AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+    table.putAll(IntStream.range(0, COUNT)
+        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
+        .collect(Collectors.toList())).get();
+    List<Result> results =
+        table
+            .getAll(IntStream.range(0, COUNT)
+                .mapToObj(
+                  i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
+                .flatMap(l -> l.stream()).collect(Collectors.toList()))
+            .get();
+    assertEquals(2 * COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
+      assertTrue(results.get(2 * i + 1).isEmpty());
+    }
+    Admin admin = TEST_UTIL.getAdmin();
+    admin.flush(TABLE_NAME);
+    TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).forEach(r -> {
+      byte[] startKey = r.getRegionInfo().getStartKey();
+      int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
+      byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
+      try {
+        admin.splitRegion(r.getRegionInfo().getRegionName(), splitPoint);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    });
+    // we are not going to test the function of split so no assertion here. Just wait for a while
+    // and then start our work.
+    Thread.sleep(5000);
+    table.deleteAll(
+      IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
+        .get();
+    results = table
+        .getAll(
+          IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
+        .get();
+    assertEquals(COUNT, results.size());
+    results.forEach(r -> assertTrue(r.isEmpty()));
+  }
+
+  @Test
+  public void testMixed() throws InterruptedException, ExecutionException {
+    AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+    table.putAll(IntStream.range(0, 5)
+        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
+        .collect(Collectors.toList())).get();
+    List<Row> actions = new ArrayList<>();
+    actions.add(new Get(Bytes.toBytes(0)));
+    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 2)));
+    actions.add(new Delete(Bytes.toBytes(2)));
+    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
+    actions.add(new Append(Bytes.toBytes(4)).add(FAMILY, CQ, Bytes.toBytes(4)));
+    List<Object> results = table.batchAll(actions).get();
+    assertEquals(5, results.size());
+    Result getResult = (Result) results.get(0);
+    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
+    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
+    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
+    Result incrementResult = (Result) results.get(3);
+    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
+    Result appendResult = (Result) results.get(4);
+    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
+    assertEquals(12, appendValue.length);
+    assertEquals(4, Bytes.toLong(appendValue));
+    assertEquals(4, Bytes.toInt(appendValue, 8));
+  }
+
+  public static final class ErrorInjectObserver extends BaseRegionObserver {
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
+        List<Cell> results) throws IOException {
+      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
+        throw new DoNotRetryRegionException("Inject Error");
+      }
+    }
+  }
+
+  @Test
+  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
+    Admin admin = TEST_UTIL.getAdmin();
+    HTableDescriptor htd = admin.getTableDescriptor(TABLE_NAME);
+    htd.addCoprocessor(ErrorInjectObserver.class.getName());
+    admin.modifyTable(TABLE_NAME, htd);
+    AsyncTableBase table = tableGetter.apply(TABLE_NAME);
+    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
+        .collect(Collectors.toList())).get();
+    List<CompletableFuture<Result>> futures = table
+        .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
+    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
+      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
+    }
+    try {
+      futures.get(SPLIT_KEYS.length - 1).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8fa5b0b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
new file mode 100644
index 0000000..da8141b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Will split the table, and move region randomly when testing.
+ */
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreaded {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static int COUNT = 1000;
+
+  private static AsyncConnection CONN;
+
+  private static byte[][] SPLIT_KEYS;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
+    TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
+    TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
+    TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
+    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+    TEST_UTIL.startMiniCluster(5);
+    SPLIT_KEYS = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    CONN.getRawTable(TABLE_NAME)
+        .putAll(
+          IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+              .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
+        .get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
+    while (!stop.get()) {
+      int i = ThreadLocalRandom.current().nextInt(COUNT);
+      assertEquals(i,
+        Bytes.toInt(
+          CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
+              .getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, ExecutionException {
+    int numThreads = 20;
+    AtomicBoolean stop = new AtomicBoolean(false);
+    ExecutorService executor =
+        Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+    List<Future<?>> futures = new ArrayList<>();
+    IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
+      run(stop);
+      return null;
+    })));
+    Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
+    Admin admin = TEST_UTIL.getAdmin();
+    for (byte[] splitPoint : SPLIT_KEYS) {
+      admin.split(TABLE_NAME, splitPoint);
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+        region.compact(true);
+      }
+      Thread.sleep(5000);
+      admin.balancer(true);
+      Thread.sleep(5000);
+      ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+      ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+          .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
+          .findAny().get();
+      admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+        Bytes.toBytes(newMetaServer.getServerName()));
+      Thread.sleep(5000);
+    }
+    stop.set(true);
+    executor.shutdown();
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
+}