You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/10/26 09:27:21 UTC

[2/2] hbase git commit: HBASE-16932 Implement small scan

HBASE-16932 Implement small scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd3dd6e0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd3dd6e0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd3dd6e0

Branch: refs/heads/master
Commit: cd3dd6e018513357d2cf0b5bba073f5a6551f7a4
Parents: 5cee6a3
Author: zhangduo <zh...@apache.org>
Authored: Wed Oct 26 17:21:35 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Oct 26 17:26:58 2016 +0800

----------------------------------------------------------------------
 .../client/AsyncConnectionConfiguration.java    |  41 +++-
 .../hadoop/hbase/client/AsyncRegionLocator.java |  23 ++
 .../client/AsyncRpcRetryingCallerFactory.java   |  83 ++++++-
 .../AsyncSingleRequestRpcRetryingCaller.java    |  15 +-
 .../client/AsyncSmallScanRpcRetryingCaller.java | 211 ++++++++++++++++++
 .../apache/hadoop/hbase/client/AsyncTable.java  |  23 ++
 .../hadoop/hbase/client/AsyncTableImpl.java     |  47 +++-
 .../hadoop/hbase/client/ClientScanner.java      | 176 +++++++--------
 .../client/ClientSmallReversedScanner.java      |   6 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  46 +++-
 .../hbase/client/ReversedClientScanner.java     |   2 +
 .../client/ScannerCallableWithReplicas.java     |  12 +-
 .../hbase/client/TestAsyncTableSmallScan.java   | 219 +++++++++++++++++++
 .../hadoop/hbase/client/TestFromClientSide.java |   3 +-
 14 files changed, 781 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index ba2e660..aaac845 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -20,11 +20,18 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
@@ -34,6 +41,7 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
@@ -59,6 +67,13 @@ class AsyncConnectionConfiguration {
   /** How many retries are allowed before we start to log */
   private final int startLogErrorsCnt;
 
+  private final long scanTimeoutNs;
+
+  private final int scannerCaching;
+
+  private final long scannerMaxResultSize;
+
+  @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -68,11 +83,18 @@ class AsyncConnectionConfiguration {
       conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
     this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
       conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
-    this.pauseNs = TimeUnit.MILLISECONDS
-        .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+    this.pauseNs =
+        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
     this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
-      DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+    this.startLogErrorsCnt =
+        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+    this.scanTimeoutNs = TimeUnit.MILLISECONDS
+        .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+          HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+    this.scannerCaching =
+        conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+      DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
   }
 
   long getMetaOperationTimeoutNs() {
@@ -103,4 +125,15 @@ class AsyncConnectionConfiguration {
     return startLogErrorsCnt;
   }
 
+  public long getScanTimeoutNs() {
+    return scanTimeoutNs;
+  }
+
+  public int getScannerCaching() {
+    return scannerCaching;
+  }
+
+  public long getScannerMaxResultSize() {
+    return scannerMaxResultSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index dc75ba6..321fd71 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * TODO: reimplement using aync connection when the scan logic is ready. The current implementation
@@ -52,6 +55,26 @@ class AsyncRegionLocator implements Closeable {
     return future;
   }
 
+  CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+      byte[] startRowOfCurrentRegion, boolean reload) {
+    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+    byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
+    try {
+      for (;;) {
+        HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
+        byte[] endKey = loc.getRegionInfo().getEndKey();
+        if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
+          future.complete(loc);
+          break;
+        }
+        toLocateRow = endKey;
+      }
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
   void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
       ServerName source) {
     conn.updateCachedLocations(tableName, regionName, row, exception, source);

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index c5ac9a5..9020ce5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import io.netty.util.HashedWheelTimer;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -54,6 +56,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs = -1L;
 
+    private boolean locateToPreviousRegion;
+
     public SingleRequestCallerBuilder<T> table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -64,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public SingleRequestCallerBuilder<T> action(
-        AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+    public SingleRequestCallerBuilder<T>
+        action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
       this.callable = callable;
       return this;
     }
@@ -80,11 +84,18 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T> locateToPreviousRegion(boolean locateToPreviousRegion) {
+      this.locateToPreviousRegion = locateToPreviousRegion;
+      return this;
+    }
+
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
-          Preconditions.checkNotNull(tableName, "tableName is null"),
-          Preconditions.checkNotNull(row, "row is null"),
-          Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
+          checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
+          locateToPreviousRegion
+              ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
+              : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
+          checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
           conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
           conn.connConf.getStartLogErrorsCnt());
     }
@@ -103,4 +114,64 @@ class AsyncRpcRetryingCallerFactory {
   public <T> SingleRequestCallerBuilder<T> single() {
     return new SingleRequestCallerBuilder<>();
   }
+
+  public class SmallScanCallerBuilder {
+
+    private TableName tableName;
+
+    private Scan scan;
+
+    private int limit;
+
+    private long scanTimeoutNs = -1L;
+
+    private long rpcTimeoutNs = -1L;
+
+    public SmallScanCallerBuilder table(TableName tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public SmallScanCallerBuilder setScan(Scan scan) {
+      this.scan = scan;
+      return this;
+    }
+
+    public SmallScanCallerBuilder limit(int limit) {
+      this.limit = limit;
+      return this;
+    }
+
+    public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
+      this.scanTimeoutNs = unit.toNanos(scanTimeout);
+      return this;
+    }
+
+    public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public AsyncSmallScanRpcRetryingCaller build() {
+      TableName tableName = checkNotNull(this.tableName, "tableName is null");
+      Scan scan = checkNotNull(this.scan, "scan is null");
+      checkArgument(limit > 0, "invalid limit %d", limit);
+      return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
+          rpcTimeoutNs);
+    }
+
+    /**
+     * Shortcut for {@code build().call()}
+     */
+    public CompletableFuture<List<Result>> call() {
+      return build().call();
+    }
+  }
+
+  /**
+   * Create retry caller for small scan.
+   */
+  public SmallScanCallerBuilder smallScan() {
+    return new SmallScanCallerBuilder();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 8acde94..1d0357d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -60,6 +60,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
         ClientService.Interface stub);
   }
 
+  @FunctionalInterface
+  public interface RegionLocator {
+    CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
+        byte[] row, boolean reload);
+  }
+
   private final HashedWheelTimer retryTimer;
 
   private final AsyncConnectionImpl conn;
@@ -68,6 +74,8 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private final byte[] row;
 
+  private final RegionLocator locator;
+
   private final Callable<T> callable;
 
   private final long pauseNs;
@@ -89,12 +97,13 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   private final long startNs;
 
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int maxRetries,
-      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+      TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
+      int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
     this.row = row;
+    this.locator = locator;
     this.callable = callable;
     this.pauseNs = pauseNs;
     this.maxAttempts = retries2Attempts(maxRetries);
@@ -207,7 +216,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   }
 
   private void locateThenCall() {
-    conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> {
+    locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
       if (error != null) {
         onError(error,
           () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
new file mode 100644
index 0000000..af639c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Retry caller for smaller scan.
+ */
+@InterfaceAudience.Private
+class AsyncSmallScanRpcRetryingCaller {
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final Scan scan;
+
+  private final int limit;
+
+  private final long scanTimeoutNs;
+
+  private final long rpcTimeoutNs;
+
+  private final Function<byte[], byte[]> createClosestNextRow;
+
+  private final Runnable firstScan;
+
+  private final Function<HRegionInfo, Boolean> nextScan;
+
+  private final List<Result> resultList;
+
+  private final CompletableFuture<List<Result>> future;
+
+  public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
+      int limit, long scanTimeoutNs, long rpcTimeoutNs) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.scan = scan;
+    this.limit = limit;
+    this.scanTimeoutNs = scanTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    if (scan.isReversed()) {
+      this.createClosestNextRow = ConnectionUtils::createClosestRowBefore;
+      this.firstScan = this::reversedFirstScan;
+      this.nextScan = this::reversedNextScan;
+    } else {
+      this.createClosestNextRow = ConnectionUtils::createClosestRowAfter;
+      this.firstScan = this::firstScan;
+      this.nextScan = this::nextScan;
+    }
+    this.resultList = new ArrayList<>();
+    this.future = new CompletableFuture<>();
+  }
+
+  private static final class SmallScanResponse {
+
+    public final Result[] results;
+
+    public final HRegionInfo currentRegion;
+
+    public final boolean hasMoreResultsInRegion;
+
+    public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
+        boolean hasMoreResultsInRegion) {
+      this.results = results;
+      this.currentRegion = currentRegion;
+      this.hasMoreResultsInRegion = hasMoreResultsInRegion;
+    }
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+      justification = "Findbugs seems to be confused by lambda expression.")
+  private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub) {
+    CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
+    ScanRequest req;
+    try {
+      req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
+        limit - resultList.size(), true);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return future;
+    }
+    stub.scan(controller, req, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        try {
+          Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
+          future.complete(
+            new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+      }
+    });
+    return future;
+  }
+
+  private void onComplete(SmallScanResponse resp) {
+    resultList.addAll(Arrays.asList(resp.results));
+    if (resultList.size() == limit) {
+      future.complete(resultList);
+      return;
+    }
+    if (resp.hasMoreResultsInRegion) {
+      if (resp.results.length > 0) {
+        scan.setStartRow(
+          createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
+      }
+      scan(false);
+      return;
+    }
+    if (!nextScan.apply(resp.currentRegion)) {
+      future.complete(resultList);
+    }
+  }
+
+  private void scan(boolean locateToPreviousRegion) {
+    conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
+        .locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call()
+        .whenComplete((resp, error) -> {
+          if (error != null) {
+            future.completeExceptionally(error);
+          } else {
+            onComplete(resp);
+          }
+        });
+  }
+
+  public CompletableFuture<List<Result>> call() {
+    firstScan.run();
+    return future;
+  }
+
+  private void firstScan() {
+    scan(false);
+  }
+
+  private void reversedFirstScan() {
+    scan(isEmptyStartRow(scan.getStartRow()));
+  }
+
+  private boolean nextScan(HRegionInfo region) {
+    if (isEmptyStopRow(scan.getStopRow())) {
+      if (isEmptyStopRow(region.getEndKey())) {
+        return false;
+      }
+    } else {
+      if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) {
+        return false;
+      }
+    }
+    scan.setStartRow(region.getEndKey());
+    scan(false);
+    return true;
+  }
+
+  private boolean reversedNextScan(HRegionInfo region) {
+    if (isEmptyStopRow(scan.getStopRow())) {
+      if (isEmptyStartRow(region.getStartKey())) {
+        return false;
+      }
+    } else {
+      if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) {
+        return false;
+      }
+    }
+    scan.setStartRow(region.getStartKey());
+    scan(true);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 4642746..94747b9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import com.google.common.base.Preconditions;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -312,4 +313,26 @@ public interface AsyncTable {
    */
   CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, RowMutations mutation);
+
+  /**
+   * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
+   * @see #smallScan(Scan, int)
+   */
+  default CompletableFuture<List<Result>> smallScan(Scan scan) {
+    return smallScan(scan, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Return all the results that match the given scan object. The number of the returned results
+   * will not be greater than {@code limit}.
+   * <p>
+   * Notice that the scan must be small, and should not use batch or allowPartialResults. The
+   * {@code caching} property of the scan object is also ignored as we will use {@code limit}
+   * instead.
+   * @param scan A configured {@link Scan} object.
+   * @param limit the limit of results count
+   * @return The results of this small scan operation. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 77a5bbe..ce53775 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * The implementation of AsyncTable.
@@ -57,12 +59,18 @@ class AsyncTableImpl implements AsyncTable {
 
   private final TableName tableName;
 
+  private final int defaultScannerCaching;
+
+  private final long defaultScannerMaxResultSize;
+
   private long readRpcTimeoutNs;
 
   private long writeRpcTimeoutNs;
 
   private long operationTimeoutNs;
 
+  private long scanTimeoutNs;
+
   public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
     this.conn = conn;
     this.tableName = tableName;
@@ -70,6 +78,9 @@ class AsyncTableImpl implements AsyncTable {
     this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
     this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
         : conn.connConf.getOperationTimeoutNs();
+    this.defaultScannerCaching = conn.connConf.getScannerCaching();
+    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
+    this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
   }
 
   @Override
@@ -256,8 +267,8 @@ class AsyncTableImpl implements AsyncTable {
             future.completeExceptionally(controller.getFailed());
           } else {
             try {
-              org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter
-                  .getResults(req, resp, controller.cellScanner());
+              org.apache.hadoop.hbase.client.MultiResponse multiResp =
+                  ResponseConverter.getResults(req, resp, controller.cellScanner());
               Throwable ex = multiResp.getException(regionName);
               if (ex != null) {
                 future
@@ -305,6 +316,38 @@ class AsyncTableImpl implements AsyncTable {
         .call();
   }
 
+  private <T> CompletableFuture<T> failedFuture(Throwable error) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(error);
+    return future;
+  }
+
+  private Scan setDefaultScanConfig(Scan scan) {
+    // always create a new scan object as we may reset the start row later.
+    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
+    if (newScan.getCaching() <= 0) {
+      newScan.setCaching(defaultScannerCaching);
+    }
+    if (newScan.getMaxResultSize() <= 0) {
+      newScan.setMaxResultSize(defaultScannerMaxResultSize);
+    }
+    return newScan;
+  }
+
+  @Override
+  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
+    if (!scan.isSmall()) {
+      return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
+    }
+    if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
+      return failedFuture(
+        new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
+    }
+    return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
+        .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
+        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+  }
+
   @Override
   public void setReadRpcTimeout(long timeout, TimeUnit unit) {
     this.readRpcTimeoutNs = unit.toNanos(timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 371a68a..00ff350 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,7 +17,20 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
 import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,20 +48,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-
 /**
  * Implements the scanner interface for the HBase client.
  * If there are multiple regions in a table, this scanner will iterate
@@ -56,53 +60,51 @@ import java.util.concurrent.ExecutorService;
  */
 @InterfaceAudience.Private
 public abstract class ClientScanner extends AbstractClientScanner {
-    private static final Log LOG = LogFactory.getLog(ClientScanner.class);
-    // A byte array in which all elements are the max byte, and it is used to
-    // construct closest front row
-    static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
-    protected Scan scan;
-    protected boolean closed = false;
-    // Current region scanner is against.  Gets cleared if current region goes
-    // wonky: e.g. if it splits on us.
-    protected HRegionInfo currentRegion = null;
-    protected ScannerCallableWithReplicas callable = null;
-    protected Queue<Result> cache;
-    /**
-     * A list of partial results that have been returned from the server. This list should only
-     * contain results if this scanner does not have enough partial results to form the complete
-     * result.
-     */
-    protected final LinkedList<Result> partialResults = new LinkedList<Result>();
-    /**
-     * The row for which we are accumulating partial Results (i.e. the row of the Results stored
-     * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
-     * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
-     */
-    protected byte[] partialResultsRow = null;
-    /**
-     * The last cell from a not full Row which is added to cache
-     */
-    protected Cell lastCellLoadedToCache = null;
-    protected final int caching;
-    protected long lastNext;
-    // Keep lastResult returned successfully in case we have to reset scanner.
-    protected Result lastResult = null;
-    protected final long maxScannerResultSize;
-    private final ClusterConnection connection;
-    private final TableName tableName;
-    protected final int scannerTimeout;
-    protected boolean scanMetricsPublished = false;
-    protected RpcRetryingCaller<Result []> caller;
-    protected RpcControllerFactory rpcControllerFactory;
-    protected Configuration conf;
-    //The timeout on the primary. Applicable if there are multiple replicas for a region
-    //In that case, we will only wait for this much timeout on the primary before going
-    //to the replicas and trying the same scan. Note that the retries will still happen
-    //on each replica and the first successful results will be taken. A timeout of 0 is
-    //disallowed.
-    protected final int primaryOperationTimeout;
-    private int retries;
-    protected final ExecutorService pool;
+  private static final Log LOG = LogFactory.getLog(ClientScanner.class);
+
+  protected Scan scan;
+  protected boolean closed = false;
+  // Current region scanner is against. Gets cleared if current region goes
+  // wonky: e.g. if it splits on us.
+  protected HRegionInfo currentRegion = null;
+  protected ScannerCallableWithReplicas callable = null;
+  protected Queue<Result> cache;
+  /**
+   * A list of partial results that have been returned from the server. This list should only
+   * contain results if this scanner does not have enough partial results to form the complete
+   * result.
+   */
+  protected final LinkedList<Result> partialResults = new LinkedList<Result>();
+  /**
+   * The row for which we are accumulating partial Results (i.e. the row of the Results stored
+   * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
+   * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
+   */
+  protected byte[] partialResultsRow = null;
+  /**
+   * The last cell from a not full Row which is added to cache
+   */
+  protected Cell lastCellLoadedToCache = null;
+  protected final int caching;
+  protected long lastNext;
+  // Keep lastResult returned successfully in case we have to reset scanner.
+  protected Result lastResult = null;
+  protected final long maxScannerResultSize;
+  private final ClusterConnection connection;
+  private final TableName tableName;
+  protected final int scannerTimeout;
+  protected boolean scanMetricsPublished = false;
+  protected RpcRetryingCaller<Result[]> caller;
+  protected RpcControllerFactory rpcControllerFactory;
+  protected Configuration conf;
+  // The timeout on the primary. Applicable if there are multiple replicas for a region
+  // In that case, we will only wait for this much timeout on the primary before going
+  // to the replicas and trying the same scan. Note that the retries will still happen
+  // on each replica and the first successful results will be taken. A timeout of 0 is
+  // disallowed.
+  protected final int primaryOperationTimeout;
+  private int retries;
+  protected final ExecutorService pool;
 
   /**
    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@@ -447,7 +449,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
             if (scan.isReversed()) {
               scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
             } else {
-              scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+              scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
             }
           } else {
             // we need rescan this row because we only loaded partial row before
@@ -737,49 +739,27 @@ public abstract class ClientScanner extends AbstractClientScanner {
     }
   }
 
-    @Override
-    public void close() {
-      if (!scanMetricsPublished) writeScanMetrics();
-      if (callable != null) {
-        callable.setClose();
-        try {
-          call(callable, caller, scannerTimeout);
-        } catch (UnknownScannerException e) {
-           // We used to catch this error, interpret, and rethrow. However, we
-           // have since decided that it's not nice for a scanner's close to
-           // throw exceptions. Chances are it was just due to lease time out.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("scanner failed to close", e);
-          }
-        } catch (IOException e) {
-          /* An exception other than UnknownScanner is unexpected. */
-          LOG.warn("scanner failed to close.", e);
+  @Override
+  public void close() {
+    if (!scanMetricsPublished) writeScanMetrics();
+    if (callable != null) {
+      callable.setClose();
+      try {
+        call(callable, caller, scannerTimeout);
+      } catch (UnknownScannerException e) {
+        // We used to catch this error, interpret, and rethrow. However, we
+        // have since decided that it's not nice for a scanner's close to
+        // throw exceptions. Chances are it was just due to lease time out.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("scanner failed to close", e);
         }
-        callable = null;
+      } catch (IOException e) {
+        /* An exception other than UnknownScanner is unexpected. */
+        LOG.warn("scanner failed to close.", e);
       }
-      closed = true;
-    }
-
-  /**
-   * Create the closest row before the specified row
-   * @param row
-   * @return a new byte array which is the closest front row of the specified one
-   */
-  protected static byte[] createClosestRowBefore(byte[] row) {
-    if (row == null) {
-      throw new IllegalArgumentException("The passed row is empty");
-    }
-    if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
-      return MAX_BYTE_ARRAY;
-    }
-    if (row[row.length - 1] == 0) {
-      return Arrays.copyOf(row, row.length - 1);
-    } else {
-      byte[] closestFrontRow = Arrays.copyOf(row, row.length);
-      closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
-      closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
-      return closestFrontRow;
+      callable = null;
     }
+    closed = true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 5fac93a..971a2ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -16,9 +16,11 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
@@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * <p>
  * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index eca9ad8..e0030e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -33,10 +37,11 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Utility used by client connections.
@@ -222,4 +227,41 @@ public final class ConnectionUtils {
       return HConstants.NO_NONCE;
     }
   };
+
+  // A byte array in which all elements are the max byte, and it is used to
+  // construct closest front row
+  static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+
+  /**
+   * Create the closest row after the specified row
+   */
+  static byte[] createClosestRowAfter(byte[] row) {
+    return Arrays.copyOf(row, row.length + 1);
+  }
+
+  /**
+   * Create the closest row before the specified row
+   */
+  static byte[] createClosestRowBefore(byte[] row) {
+    if (row.length == 0) {
+      return MAX_BYTE_ARRAY;
+    }
+    if (row[row.length - 1] == 0) {
+      return Arrays.copyOf(row, row.length - 1);
+    } else {
+      byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
+      System.arraycopy(row, 0, nextRow, 0, row.length - 1);
+      nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
+      System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length);
+      return nextRow;
+    }
+  }
+
+  static boolean isEmptyStartRow(byte[] row) {
+    return Bytes.equals(row, EMPTY_START_ROW);
+  }
+
+  static boolean isEmptyStopRow(byte[] row) {
+    return Bytes.equals(row, EMPTY_END_ROW);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index dde82ba..390e236 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 5174598..e04fd6e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -30,7 +33,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -40,12 +42,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class has the logic for handling scanners for regions with and without replicas.
  * 1. A scan is attempted on the default (primary) region
@@ -341,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       if (callable.getScan().isReversed()) {
         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
       } else {
-        callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+        callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
new file mode 100644
index 0000000..972780e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableSmallScan {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static int COUNT = 1000;
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<CompletableFuture<?>> futures = new ArrayList<>();
+    IntStream.range(0, COUNT)
+        .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
+            .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    ASYNC_CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScanAll() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
+    assertEquals(COUNT, results.size());
+    IntStream.range(0, COUNT).forEach(i -> {
+      Result result = results.get(i);
+      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  @Test
+  public void testReversedScanAll() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
+    assertEquals(COUNT, results.size());
+    IntStream.range(0, COUNT).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = COUNT - i - 1;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  @Test
+  public void testScanNoStopKey() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int start = 345;
+    List<Result> results = table
+        .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
+    assertEquals(COUNT - start, results.size());
+    IntStream.range(0, COUNT - start).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start + i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  @Test
+  public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int start = 765;
+    List<Result> results = table
+        .smallScan(
+          new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
+        .get();
+    assertEquals(start + 1, results.size());
+    IntStream.range(0, start + 1).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start - i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
+    assertEquals(stop - start, results.size());
+    IntStream.range(0, stop - start).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start + i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  private void testReversedScan(int start, int stop)
+      throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
+        .get();
+    assertEquals(start - stop, results.size());
+    IntStream.range(0, start - stop).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start - i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  @Test
+  public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
+    testScan(345, 567);
+  }
+
+  @Test
+  public void testReversedScanWithStartKeyAndStopKey()
+      throws InterruptedException, ExecutionException {
+    testReversedScan(765, 543);
+  }
+
+  @Test
+  public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
+    testScan(222, 333);
+  }
+
+  @Test
+  public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
+    testScan(222, 333);
+  }
+
+  @Test
+  public void testScanWithLimit() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int start = 111;
+    int stop = 888;
+    int limit = 300;
+    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
+      limit).get();
+    assertEquals(limit, results.size());
+    IntStream.range(0, limit).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start + i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+
+  @Test
+  public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    int start = 888;
+    int stop = 111;
+    int limit = 300;
+    List<Result> results = table.smallScan(
+      new Scan(Bytes.toBytes(String.format("%03d", start)))
+          .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
+      limit).get();
+    assertEquals(limit, results.size());
+    IntStream.range(0, limit).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start - i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 89841a9..ae93e67 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -5933,7 +5934,7 @@ public class TestFromClientSide {
   public void testReversedScanUnderMultiRegions() throws Exception {
     // Test Initialization.
     TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions");
-    byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY;
+    byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
     byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
         Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
         Bytes.toBytes("006"),