You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/20 12:42:42 UTC

hbase git commit: HBASE-17142 Implement multi get

Repository: hbase
Updated Branches:
  refs/heads/master db5953c6f -> a2e967d92


HBASE-17142 Implement multi get


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a2e967d9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a2e967d9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a2e967d9

Branch: refs/heads/master
Commit: a2e967d92f8550981455266634e9eea4435d3d95
Parents: db5953c
Author: zhangduo <zh...@apache.org>
Authored: Tue Dec 20 17:04:17 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Dec 20 20:41:55 2016 +0800

----------------------------------------------------------------------
 .../client/AsyncMultiGetRpcRetryingCaller.java  | 406 +++++++++++++++++++
 .../client/AsyncRpcRetryingCallerFactory.java   |  45 ++
 .../AsyncSingleRequestRpcRetryingCaller.java    |   6 +-
 .../hadoop/hbase/client/AsyncTableBase.java     |  24 ++
 .../hadoop/hbase/client/AsyncTableImpl.java     |   6 +
 .../hadoop/hbase/client/ConnectionUtils.java    |   3 +
 .../hadoop/hbase/client/RawAsyncTable.java      |  20 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |   7 +
 .../hbase/client/TestAsyncTableMultiGet.java    | 163 ++++++++
 9 files changed, 666 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
new file mode 100644
index 0000000..8a9b9a8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Retry caller for multi get.
+ * <p>
+ * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
+ * other single operations
+ * <p>
+ * And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the
+ * implementation, we will record a {@code tries} parameter for each operation group, and if it is
+ * split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine
+ * that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth
+ * of the tree.
+ */
+@InterfaceAudience.Private
+class AsyncMultiGetRpcRetryingCaller {
+
+  private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class);
+
+  private final HashedWheelTimer retryTimer;
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final List<Get> gets;
+
+  private final List<CompletableFuture<Result>> futures;
+
+  private final IdentityHashMap<Get, CompletableFuture<Result>> get2Future;
+
+  private final IdentityHashMap<Get, List<ThrowableWithExtraContext>> get2Errors;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final long operationTimeoutNs;
+
+  private final long rpcTimeoutNs;
+
+  private final int startLogErrorsCnt;
+
+  private final long startNs;
+
+  // we can not use HRegionLocation as the map key because the hashCode and equals method of
+  // HRegionLocation only consider serverName.
+  private static final class RegionRequest {
+
+    public final HRegionLocation loc;
+
+    public final ConcurrentLinkedQueue<Get> gets = new ConcurrentLinkedQueue<>();
+
+    public RegionRequest(HRegionLocation loc) {
+      this.loc = loc;
+    }
+  }
+
+  public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+      TableName tableName, List<Get> gets, long pauseNs, int maxRetries, long operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
+    this.retryTimer = retryTimer;
+    this.conn = conn;
+    this.tableName = tableName;
+    this.gets = gets;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = retries2Attempts(maxRetries);
+    this.operationTimeoutNs = operationTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
+
+    this.futures = new ArrayList<>(gets.size());
+    this.get2Future = new IdentityHashMap<>(gets.size());
+    gets.forEach(
+      get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>())));
+    this.get2Errors = new IdentityHashMap<>();
+    this.startNs = System.nanoTime();
+  }
+
+  private long remainingTimeNs() {
+    return operationTimeoutNs - (System.nanoTime() - startNs);
+  }
+
+  private List<ThrowableWithExtraContext> removeErrors(Get get) {
+    synchronized (get2Errors) {
+      return get2Errors.remove(get);
+    }
+  }
+
+  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
+      Throwable error, ServerName serverName) {
+    if (tries > startLogErrorsCnt) {
+      String regions =
+          regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
+              .collect(Collectors.joining(",", "[", "]"));
+      LOG.warn("Get data for " + regions + " in " + tableName + " from " + serverName
+          + " failed, tries=" + tries,
+        error);
+    }
+  }
+
+  private String getExtras(ServerName serverName) {
+    return serverName != null ? serverName.getServerName() : "";
+  }
+
+  private void addError(Get get, Throwable error, ServerName serverName) {
+    List<ThrowableWithExtraContext> errors;
+    synchronized (get2Errors) {
+      errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>());
+    }
+    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
+        serverName != null ? serverName.toString() : ""));
+  }
+
+  private void addError(Iterable<Get> gets, Throwable error, ServerName serverName) {
+    gets.forEach(get -> addError(get, error, serverName));
+  }
+
+  private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) {
+    CompletableFuture<Result> future = get2Future.get(get);
+    if (future.isDone()) {
+      return;
+    }
+    ThrowableWithExtraContext errorWithCtx =
+        new ThrowableWithExtraContext(error, currentTime, extras);
+    List<ThrowableWithExtraContext> errors = removeErrors(get);
+    if (errors == null) {
+      errors = Collections.singletonList(errorWithCtx);
+    } else {
+      errors.add(errorWithCtx);
+    }
+    future.completeExceptionally(new RetriesExhaustedException(tries, errors));
+  }
+
+  private void failAll(Stream<Get> gets, int tries, Throwable error, ServerName serverName) {
+    long currentTime = System.currentTimeMillis();
+    String extras = getExtras(serverName);
+    gets.forEach(get -> failOne(get, tries, error, currentTime, extras));
+  }
+
+  private void failAll(Stream<Get> gets, int tries) {
+    gets.forEach(get -> {
+      CompletableFuture<Result> future = get2Future.get(get);
+      if (future.isDone()) {
+        return;
+      }
+      future.completeExceptionally(new RetriesExhaustedException(tries,
+          Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList())));
+    });
+  }
+
+  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> getsByRegion)
+      throws IOException {
+    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
+    for (Map.Entry<byte[], RegionRequest> entry : getsByRegion.entrySet()) {
+      ClientProtos.RegionAction.Builder regionActionBuilder =
+          ClientProtos.RegionAction.newBuilder().setRegion(
+            RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
+      int index = 0;
+      for (Get get : entry.getValue().gets) {
+        regionActionBuilder.addAction(
+          ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get)));
+        index++;
+      }
+      multiRequestBuilder.addRegionAction(regionActionBuilder);
+    }
+    return multiRequestBuilder.build();
+  }
+
+  private void onComplete(Map<byte[], RegionRequest> getsByRegion, int tries, ServerName serverName,
+      MultiResponse resp) {
+    List<Get> failedGets = new ArrayList<>();
+    getsByRegion.forEach((rn, regionReq) -> {
+      RegionResult regionResult = resp.getResults().get(rn);
+      if (regionResult != null) {
+        int index = 0;
+        for (Get get : regionReq.gets) {
+          Object result = regionResult.result.get(index);
+          if (result == null) {
+            LOG.error("Server sent us neither result nor exception for row '"
+                + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn));
+            addError(get, new RuntimeException("Invalid response"), serverName);
+            failedGets.add(get);
+          } else if (result instanceof Throwable) {
+            Throwable error = translateException((Throwable) result);
+            logException(tries, () -> Stream.of(regionReq), error, serverName);
+            if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+              failOne(get, tries, error, EnvironmentEdgeManager.currentTime(),
+                getExtras(serverName));
+            } else {
+              failedGets.add(get);
+            }
+          } else {
+            get2Future.get(get).complete((Result) result);
+          }
+          index++;
+        }
+      } else {
+        Throwable t = resp.getException(rn);
+        Throwable error;
+        if (t == null) {
+          LOG.error(
+            "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+          error = new RuntimeException("Invalid response");
+        } else {
+          error = translateException(t);
+          logException(tries, () -> Stream.of(regionReq), error, serverName);
+          conn.getLocator().updateCachedLocation(regionReq.loc, error);
+          if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+            failAll(regionReq.gets.stream(), tries, error, serverName);
+            return;
+          }
+          addError(regionReq.gets, error, serverName);
+          failedGets.addAll(regionReq.gets);
+        }
+      }
+    });
+    if (!failedGets.isEmpty()) {
+      tryResubmit(failedGets.stream(), tries);
+    }
+  }
+
+  private void send(Map<ServerName, ? extends Map<byte[], RegionRequest>> getsByServer, int tries) {
+    long callTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      long remainingNs = remainingTimeNs();
+      if (remainingNs <= 0) {
+        failAll(getsByServer.values().stream().flatMap(m -> m.values().stream())
+            .flatMap(r -> r.gets.stream()),
+          tries);
+        return;
+      }
+      callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs);
+    } else {
+      callTimeoutNs = rpcTimeoutNs;
+    }
+    getsByServer.forEach((sn, getsByRegion) -> {
+      ClientService.Interface stub;
+      try {
+        stub = conn.getRegionServerStub(sn);
+      } catch (IOException e) {
+        onError(getsByRegion, tries, e, sn);
+        return;
+      }
+      ClientProtos.MultiRequest req;
+      try {
+        req = buildReq(getsByRegion);
+      } catch (IOException e) {
+        onError(getsByRegion, tries, e, sn);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      resetController(controller, callTimeoutNs);
+      stub.multi(controller, req, resp -> {
+        if (controller.failed()) {
+          onError(getsByRegion, tries, controller.getFailed(), sn);
+        } else {
+          try {
+            onComplete(getsByRegion, tries, sn,
+              ResponseConverter.getResults(req, resp, controller.cellScanner()));
+          } catch (Exception e) {
+            onError(getsByRegion, tries, e, sn);
+            return;
+          }
+        }
+      });
+    });
+  }
+
+  private void onError(Map<byte[], RegionRequest> getsByRegion, int tries, Throwable t,
+      ServerName serverName) {
+    Throwable error = translateException(t);
+    logException(tries, () -> getsByRegion.values().stream(), error, serverName);
+    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+      failAll(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, error,
+        serverName);
+      return;
+    }
+    List<Get> copiedGets =
+        getsByRegion.values().stream().flatMap(r -> r.gets.stream()).collect(Collectors.toList());
+    addError(copiedGets, error, serverName);
+    tryResubmit(copiedGets.stream(), tries);
+  }
+
+  private void tryResubmit(Stream<Get> gets, int tries) {
+    long delayNs;
+    if (operationTimeoutNs > 0) {
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+      if (maxDelayNs <= 0) {
+        failAll(gets, tries);
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  private void groupAndSend(Stream<Get> gets, int tries) {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        failAll(gets, tries);
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    ConcurrentMap<ServerName, ConcurrentMap<byte[], RegionRequest>> getsByServer =
+        new ConcurrentHashMap<>();
+    ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
+    CompletableFuture.allOf(gets.map(get -> conn.getLocator()
+        .getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> {
+          if (error != null) {
+            error = translateException(error);
+            if (error instanceof DoNotRetryIOException) {
+              failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), "");
+              return;
+            }
+            addError(get, error, null);
+            locateFailed.add(get);
+          } else {
+            ConcurrentMap<byte[], RegionRequest> getsByRegion = computeIfAbsent(getsByServer,
+              loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
+            computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(),
+              () -> new RegionRequest(loc)).gets.add(get);
+          }
+        })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
+          if (!getsByServer.isEmpty()) {
+            send(getsByServer, tries);
+          }
+          if (!locateFailed.isEmpty()) {
+            tryResubmit(locateFailed.stream(), tries);
+          }
+        });
+  }
+
+  public List<CompletableFuture<Result>> call() {
+    groupAndSend(gets.stream(), 1);
+    return futures;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index c40de31..f1a4247 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -257,4 +257,49 @@ class AsyncRpcRetryingCallerFactory {
   public ScanSingleRegionCallerBuilder scanSingleRegion() {
     return new ScanSingleRegionCallerBuilder();
   }
+
+  public class MultiGetCallerBuilder {
+
+    private TableName tableName;
+
+    private List<Get> gets;
+
+    private long operationTimeoutNs = -1L;
+
+    private long rpcTimeoutNs = -1L;
+
+    public MultiGetCallerBuilder table(TableName tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public MultiGetCallerBuilder gets(List<Get> gets) {
+      this.gets = gets;
+      return this;
+    }
+
+    public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
+      this.operationTimeoutNs = unit.toNanos(operationTimeout);
+      return this;
+    }
+
+    public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public AsyncMultiGetRpcRetryingCaller build() {
+      return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets,
+          conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
+          rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+    }
+
+    public List<CompletableFuture<Result>> call() {
+      return build().call();
+    }
+  }
+
+  public MultiGetCallerBuilder multiGet() {
+    return new MultiGetCallerBuilder();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 44a237d..d6da131 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
@@ -52,9 +53,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
 
-  // Add a delta to avoid timeout immediately after a retry sleeping.
-  private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
-
   @FunctionalInterface
   public interface Callable<T> {
     CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
@@ -146,7 +144,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     }
     long delayNs;
     if (operationTimeoutNs > 0) {
-      long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS;
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         completeExceptionally();
         return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index e051a6b..a2b5247 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -351,4 +352,27 @@ public interface AsyncTableBase {
    *         {@link CompletableFuture}.
    */
   CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+
+  /**
+   * Extracts certain cells from the given rows, in batch.
+   * <p>
+   * Notice that you may not get all the results with this function, which means some of the
+   * returned {@link CompletableFuture}s may succeed while some of the other returned
+   * {@link CompletableFuture}s may fail.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A list of {@link CompletableFuture}s that represent the result for each get.
+   */
+  List<CompletableFuture<Result>> get(List<Get> gets);
+
+  /**
+   * A simple version for batch get. It will fail if there are any failures and you will get the
+   * whole result list at once if the operation is succeeded.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A {@link CompletableFuture} that wrapper the result list.
+   */
+  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
+    List<CompletableFuture<Result>> futures = get(gets);
+    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index cecf815..6cc2551 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -191,4 +192,9 @@ class AsyncTableImpl implements AsyncTable {
   public void scan(Scan scan, ScanResultConsumer consumer) {
     pool.execute(() -> scan0(scan, consumer));
   }
+
+  @Override
+  public List<CompletableFuture<Result>> get(List<Get> gets) {
+    return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 9df9fbb..cc27992 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -324,4 +324,7 @@ public final class ConnectionUtils {
     return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
       result.isStale(), true);
   }
+
+  // Add a delta to avoid timeout immediately after a retry sleeping.
+  static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 823367a..0c292a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
  * especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
  * <p>
  * TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
- * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
- * it is not suitable for a normal user. If it is still the only difference after we implement most
- * features of AsyncTable, we can think about merge these two interfaces.
+ * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat)
+ * so it is not suitable for a normal user. If it is still the only difference after we implement
+ * most features of AsyncTable, we can think about merge these two interfaces.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -42,13 +42,13 @@ public interface RawAsyncTable extends AsyncTableBase {
 
   /**
    * The basic scan API uses the observer pattern. All results that match the given scan object will
-   * be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}.
-   * {@link RawScanResultConsumer#onComplete()} means the scan is finished, and
-   * {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
-   * is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we
-   * can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually
-   * because the matched results are too sparse, for example, a filter which almost filters out
-   * everything is specified.
+   * be passed to the given {@code consumer} by calling
+   * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()}
+   * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit
+   * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()}
+   * means the RS is still working but we can not get a valid result to call
+   * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are
+   * too sparse, for example, a filter which almost filters out everything is specified.
    * <p>
    * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
    * framework's callback thread, so typically you should not do any time consuming work inside

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index cdc90ab..6fad0da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -405,4 +405,11 @@ class RawAsyncTableImpl implements RawAsyncTable {
   public long getScanTimeout(TimeUnit unit) {
     return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
   }
+
+  @Override
+  public List<CompletableFuture<Result>> get(List<Get> gets) {
+    return conn.callerFactory.multiGet().table(tableName).gets(gets)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a2e967d9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
new file mode 100644
index 0000000..612e830
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableMultiGet {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static int COUNT = 100;
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @Parameter
+  public Supplier<AsyncTableBase> getTable;
+
+  private static RawAsyncTable getRawTable() {
+    return ASYNC_CONN.getRawTable(TABLE_NAME);
+  }
+
+  private static AsyncTable getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTableMultiGet::getRawTable },
+      new Supplier<?>[] { TestAsyncTableMultiGet::getTable });
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 11; i < 99; i += 11) {
+      splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
+    }
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
+    List<CompletableFuture<?>> futures = new ArrayList<>();
+    IntStream.range(0, COUNT).forEach(i -> futures.add(table.put(
+      new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))));
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    ASYNC_CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void move() throws IOException, InterruptedException {
+    HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+    HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get();
+    Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get();
+    TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(),
+      Bytes.toBytes(dst.getServerName().getServerName()));
+    Thread.sleep(1000);
+  }
+
+  private void test(BiFunction<AsyncTableBase, List<Get>, List<Result>> getFunc)
+      throws IOException, InterruptedException {
+    AsyncTableBase table = getTable.get();
+    List<Get> gets =
+        IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i))))
+            .collect(Collectors.toList());
+    List<Result> results = getFunc.apply(table, gets);
+    assertEquals(COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      Result result = results.get(i);
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
+    }
+    // test basic failure recovery
+    move();
+    results = getFunc.apply(table, gets);
+    assertEquals(COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      Result result = results.get(i);
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
+    }
+  }
+
+  @Test
+  public void testGet() throws InterruptedException, IOException {
+    test((table, gets) -> {
+      return table.get(gets).stream().map(f -> {
+        try {
+          return f.get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }).collect(Collectors.toList());
+    });
+
+  }
+
+  @Test
+  public void testGetAll() throws InterruptedException, IOException {
+    test((table, gets) -> {
+      try {
+        return table.getAll(gets).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}