You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/25 01:53:48 UTC

[1/3] hbase git commit: HBASE-17045 Unify the implementation of small scan and regular scan

Repository: hbase
Updated Branches:
  refs/heads/master 616f4801b -> 85d701892


http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
new file mode 100644
index 0000000..a9a3e43
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.client.Scan.ReadType;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
+
+  @Parameter(0)
+  public String tableType;
+
+  @Parameter(1)
+  public Supplier<AsyncTableBase> getTable;
+
+  @Parameter(2)
+  public String scanType;
+
+  @Parameter(3)
+  public Supplier<Scan> scanCreator;
+
+  private static RawAsyncTable getRawTable() {
+    return ASYNC_CONN.getRawTable(TABLE_NAME);
+  }
+
+  private static AsyncTable getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  private static Scan createNormalScan() {
+    return new Scan();
+  }
+
+  // test if we can handle partial result when open scanner.
+  private static Scan createSmallResultSizeScan() {
+    return new Scan().setMaxResultSize(1);
+  }
+
+  @Parameters(name = "{index}: table={0}, scan={2}")
+  public static List<Object[]> params() {
+    Supplier<AsyncTableBase> rawTable = TestAsyncTableScanAll::getRawTable;
+    Supplier<AsyncTableBase> normalTable = TestAsyncTableScanAll::getTable;
+    Supplier<Scan> normalScan = TestAsyncTableScanAll::createNormalScan;
+    Supplier<Scan> smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan;
+    return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan },
+      new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan },
+      new Object[] { "normal", normalTable, "normal", normalScan },
+      new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan });
+  }
+
+  @Test
+  public void testScanWithLimit() throws InterruptedException, ExecutionException {
+    int start = 111;
+    int stop = 888;
+    int limit = 300;
+    List<Result> results = getTable.get()
+        .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
+            .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
+            .setReadType(ReadType.PREAD))
+        .get();
+    assertEquals(limit, results.size());
+    IntStream.range(0, limit).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start + i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+    });
+  }
+
+  @Test
+  public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
+    int start = 888;
+    int stop = 111;
+    int limit = 300;
+    List<Result> results = getTable.get()
+        .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
+            .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
+            .setReadType(ReadType.PREAD).setReversed(true))
+        .get();
+    assertEquals(limit, results.size());
+    IntStream.range(0, limit).forEach(i -> {
+      Result result = results.get(i);
+      int actualIndex = start - i;
+      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+    });
+  }
+
+  @Override
+  protected Scan createScan() {
+    return scanCreator.get();
+  }
+
+  @Override
+  protected List<Result> doScan(Scan scan) throws Exception {
+    return getTable.get().scanAll(scan).get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
deleted file mode 100644
index 3737af2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
-
-  @Parameter
-  public Supplier<AsyncTableBase> getTable;
-
-  private static RawAsyncTable getRawTable() {
-    return ASYNC_CONN.getRawTable(TABLE_NAME);
-  }
-
-  private static AsyncTable getTable() {
-    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
-  }
-
-  @Parameters
-  public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableSmallScan::getRawTable },
-      new Supplier<?>[] { TestAsyncTableSmallScan::getTable });
-  }
-
-  @Test
-  public void testScanWithLimit() throws InterruptedException, ExecutionException {
-    AsyncTableBase table = getTable.get();
-    int start = 111;
-    int stop = 888;
-    int limit = 300;
-    List<Result> results =
-        table
-            .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
-                .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
-              limit)
-            .get();
-    assertEquals(limit, results.size());
-    IntStream.range(0, limit).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start + i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
-    });
-  }
-
-  @Test
-  public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
-    AsyncTableBase table = getTable.get();
-    int start = 888;
-    int stop = 111;
-    int limit = 300;
-    List<Result> results = table.smallScan(
-      new Scan(Bytes.toBytes(String.format("%03d", start)))
-          .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
-      limit).get();
-    assertEquals(limit, results.size());
-    IntStream.range(0, limit).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start - i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
-    });
-  }
-
-  @Override
-  protected Scan createScan() {
-    return new Scan().setSmall(true);
-  }
-
-  @Override
-  protected List<Result> doScan(Scan scan) throws Exception {
-    return getTable.get().smallScan(scan).get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 9f3970b..a8ef353 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -57,11 +57,6 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
     }
 
     @Override
-    public boolean onHeartbeat() {
-      return true;
-    }
-
-    @Override
     public synchronized void onError(Throwable error) {
       finished = true;
       this.error = error;

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
index 3c5fe27..1b3c4e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
@@ -369,7 +370,10 @@ public class TestMultiRowRangeFilter {
   @Test
   public void testMultiRowRangeFilterWithExclusive() throws IOException {
     tableName = TableName.valueOf("testMultiRowRangeFilterWithExclusive");
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
     Table ht = TEST_UTIL.createTable(tableName, family, Integer.MAX_VALUE);
+    ht.setReadRpcTimeout(600000);
+    ht.setOperationTimeout(6000000);
     generateRows(numRows, ht, family, qf, value);
 
     Scan scan = new Scan();


[3/3] hbase git commit: HBASE-17045 Unify the implementation of small scan and regular scan

Posted by zh...@apache.org.
HBASE-17045 Unify the implementation of small scan and regular scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85d70189
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85d70189
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85d70189

Branch: refs/heads/master
Commit: 85d701892ed969380a8bcca9c9f4e306c74af941
Parents: 616f480
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 24 21:07:25 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Jan 25 09:53:06 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncClientScanner.java |  23 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |   4 +-
 .../client/AsyncRpcRetryingCallerFactory.java   |  84 +--
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  49 +-
 .../client/AsyncSmallScanRpcRetryingCaller.java | 194 -------
 .../hadoop/hbase/client/AsyncTableBase.java     |  40 +-
 .../hadoop/hbase/client/AsyncTableImpl.java     |   5 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  56 +-
 .../hbase/client/RawScanResultConsumer.java     |   4 +-
 .../org/apache/hadoop/hbase/client/Scan.java    | 146 +++--
 .../hadoop/hbase/client/ScannerCallable.java    |   2 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  34 ++
 .../hbase/shaded/protobuf/ProtobufUtil.java     |  35 ++
 .../hbase/shaded/protobuf/RequestConverter.java |  24 +-
 .../shaded/protobuf/generated/ClientProtos.java | 553 ++++++++++++++----
 .../src/main/protobuf/Client.proto              |  11 +-
 .../hbase/protobuf/generated/ClientProtos.java  | 556 +++++++++++++++----
 hbase-protocol/src/main/protobuf/Client.proto   |  11 +-
 .../hbase/regionserver/RSRpcServices.java       |  18 +
 .../hadoop/hbase/regionserver/StoreScanner.java |  38 +-
 .../client/AbstractTestAsyncTableScan.java      |   6 +
 .../hbase/client/TestAsyncTableScanAll.java     | 132 +++++
 .../hbase/client/TestAsyncTableSmallScan.java   | 109 ----
 .../hbase/client/TestRawAsyncTableScan.java     |   5 -
 .../hbase/filter/TestMultiRowRangeFilter.java   |   4 +
 25 files changed, 1385 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index f656a6c..b9fd34f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
 /**
  * The asynchronous client scanner implementation.
@@ -95,12 +96,16 @@ class AsyncClientScanner {
 
     public final ClientService.Interface stub;
 
-    public final long scannerId;
+    public final HBaseRpcController controller;
 
-    public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) {
+    public final ScanResponse resp;
+
+    public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
+        ScanResponse resp) {
       this.loc = loc;
       this.stub = stub;
-      this.scannerId = scannerId;
+      this.controller = controller;
+      this.resp = resp;
     }
   }
 
@@ -108,14 +113,14 @@ class AsyncClientScanner {
       HRegionLocation loc, ClientService.Interface stub) {
     CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
     try {
-      ScanRequest request =
-          RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false);
+      ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
+        scan, scan.getCaching(), false);
       stub.scan(controller, request, resp -> {
         if (controller.failed()) {
           future.completeExceptionally(controller.getFailed());
           return;
         }
-        future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId()));
+        future.complete(new OpenScannerResponse(loc, stub, controller, resp));
       });
     } catch (IOException e) {
       future.completeExceptionally(e);
@@ -124,11 +129,11 @@ class AsyncClientScanner {
   }
 
   private void startScan(OpenScannerResponse resp) {
-    conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
-        .setScan(scan).consumer(consumer).resultCache(resultCache)
+    conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+        .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp)
         .whenComplete((hasMore, error) -> {
           if (error != null) {
             consumer.onError(error);

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index ae79b65..2c8669f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -376,7 +377,8 @@ class AsyncNonMetaRegionLocator {
       metaKey = createRegionName(tableName, req.row, NINES, false);
     }
     conn.getRawTable(META_TABLE_NAME)
-        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .scanAll(new Scan().withStartRow(metaKey).setReversed(true).setReadType(ReadType.PREAD)
+            .addFamily(CATALOG_FAMILY).setLimit(1))
         .whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 5df66cc..6bc2cc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -30,7 +30,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
 /**
  * Factory to create an AsyncRpcRetryCaller.
@@ -138,81 +140,6 @@ class AsyncRpcRetryingCallerFactory {
     return new SingleRequestCallerBuilder<>();
   }
 
-  public class SmallScanCallerBuilder extends BuilderBase {
-
-    private TableName tableName;
-
-    private Scan scan;
-
-    private int limit;
-
-    private long scanTimeoutNs = -1L;
-
-    private long rpcTimeoutNs = -1L;
-
-    public SmallScanCallerBuilder table(TableName tableName) {
-      this.tableName = tableName;
-      return this;
-    }
-
-    public SmallScanCallerBuilder setScan(Scan scan) {
-      this.scan = scan;
-      return this;
-    }
-
-    public SmallScanCallerBuilder limit(int limit) {
-      this.limit = limit;
-      return this;
-    }
-
-    public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
-      this.scanTimeoutNs = unit.toNanos(scanTimeout);
-      return this;
-    }
-
-    public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
-      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
-      return this;
-    }
-
-    public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
-      this.pauseNs = unit.toNanos(pause);
-      return this;
-    }
-
-    public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
-      this.maxAttempts = maxAttempts;
-      return this;
-    }
-
-    public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
-      this.startLogErrorsCnt = startLogErrorsCnt;
-      return this;
-    }
-
-    public AsyncSmallScanRpcRetryingCaller build() {
-      TableName tableName = checkNotNull(this.tableName, "tableName is null");
-      Scan scan = checkNotNull(this.scan, "scan is null");
-      checkArgument(limit > 0, "invalid limit %d", limit);
-      return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
-          scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
-    }
-
-    /**
-     * Shortcut for {@code build().call()}
-     */
-    public CompletableFuture<List<Result>> call() {
-      return build().call();
-    }
-  }
-
-  /**
-   * Create retry caller for small scan.
-   */
-  public SmallScanCallerBuilder smallScan() {
-    return new SmallScanCallerBuilder();
-  }
-
   public class ScanSingleRegionCallerBuilder extends BuilderBase {
 
     private long scannerId = -1L;
@@ -297,10 +224,11 @@ class AsyncRpcRetryingCallerFactory {
     }
 
     /**
-     * Short cut for {@code build().start()}.
+     * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
      */
-    public CompletableFuture<Boolean> start() {
-      return build().start();
+    public CompletableFuture<Boolean> start(HBaseRpcController controller,
+        ScanResponse respWhenOpen) {
+      return build().start(controller, respWhenOpen);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 5d3b736..3ef4a6f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import io.netty.util.HashedWheelTimer;
@@ -135,6 +134,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
   }
 
+  private long remainingTimeNs() {
+    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+  }
+
   private void closeScanner() {
     resetController(controller, rpcTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
@@ -199,7 +202,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
     long delayNs;
     if (scanTimeoutNs > 0) {
-      long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
       if (maxDelayNs <= 0) {
         completeExceptionally(!scannerClosed);
         return;
@@ -245,7 +248,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
-  private void onComplete(ScanResponse resp) {
+  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
     if (controller.failed()) {
       onError(controller.getFailed());
       return;
@@ -288,6 +291,13 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeNoMoreResults();
       return;
     }
+    if (scan.getLimit() > 0) {
+      // The RS should have set the moreResults field in ScanResponse to false when we have reached
+      // the limit.
+      int limit = scan.getLimit() - results.length;
+      assert limit > 0;
+      scan.setLimit(limit);
+    }
     // as in 2.0 this value will always be set
     if (!resp.getMoreResultsInRegion()) {
       completeWhenNoMoreResultsInRegion.run();
@@ -297,10 +307,26 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   private void call() {
-    resetController(controller, rpcTimeoutNs);
+    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+    // less than the scan timeout. If the server does not respond in time(usually this will not
+    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+    // resending the next request and the only way to fix this is to close the scanner and open a
+    // new one.
+    long callTimeoutNs;
+    if (scanTimeoutNs > 0) {
+      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      if (remainingNs <= 0) {
+        completeExceptionally(true);
+        return;
+      }
+      callTimeoutNs = remainingNs;
+    } else {
+      callTimeoutNs = 0L;
+    }
+    resetController(controller, callTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
-      nextCallSeq, false, false);
-    stub.scan(controller, req, this::onComplete);
+      nextCallSeq, false, false, scan.getLimit());
+    stub.scan(controller, req, resp -> onComplete(controller, resp));
   }
 
   private void next() {
@@ -312,10 +338,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   /**
+   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
+   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
+   * open scanner request is also needed because we may have some data in the CellScanner which is
+   * contained in the controller.
    * @return {@code true} if we should continue, otherwise {@code false}.
    */
-  public CompletableFuture<Boolean> start() {
-    next();
+  public CompletableFuture<Boolean> start(HBaseRpcController controller,
+      ScanResponse respWhenOpen) {
+    onComplete(controller, respWhenOpen);
     return future;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
deleted file mode 100644
index 98a276f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
-
-/**
- * Retry caller for smaller scan.
- */
-@InterfaceAudience.Private
-class AsyncSmallScanRpcRetryingCaller {
-
-  private final AsyncConnectionImpl conn;
-
-  private final TableName tableName;
-
-  private final Scan scan;
-
-  private final int limit;
-
-  private final long scanTimeoutNs;
-
-  private final long rpcTimeoutNs;
-
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final int startLogErrosCnt;
-
-  private final Function<HRegionInfo, Boolean> nextScan;
-
-  private final List<Result> resultList;
-
-  private final CompletableFuture<List<Result>> future;
-
-  public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
-      int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
-      int startLogErrosCnt) {
-    this.conn = conn;
-    this.tableName = tableName;
-    this.scan = scan;
-    this.limit = limit;
-    this.scanTimeoutNs = scanTimeoutNs;
-    this.rpcTimeoutNs = rpcTimeoutNs;
-    this.pauseNs = pauseNs;
-    this.maxAttempts = maxAttempts;
-    this.startLogErrosCnt = startLogErrosCnt;
-    if (scan.isReversed()) {
-      this.nextScan = this::reversedNextScan;
-    } else {
-      this.nextScan = this::nextScan;
-    }
-    this.resultList = new ArrayList<>();
-    this.future = new CompletableFuture<>();
-  }
-
-  private static final class SmallScanResponse {
-
-    public final Result[] results;
-
-    public final HRegionInfo currentRegion;
-
-    public final boolean hasMoreResultsInRegion;
-
-    public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
-        boolean hasMoreResultsInRegion) {
-      this.results = results;
-      this.currentRegion = currentRegion;
-      this.hasMoreResultsInRegion = hasMoreResultsInRegion;
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-      justification = "Findbugs seems to be confused by lambda expression.")
-  private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub) {
-    CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
-    ScanRequest req;
-    try {
-      req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
-        limit - resultList.size(), true);
-    } catch (IOException e) {
-      future.completeExceptionally(e);
-      return future;
-    }
-    stub.scan(controller, req, resp -> {
-      if (controller.failed()) {
-        future.completeExceptionally(controller.getFailed());
-      } else {
-        try {
-          Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
-          future.complete(
-            new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
-        } catch (IOException e) {
-          future.completeExceptionally(e);
-        }
-      }
-    });
-    return future;
-  }
-
-  private void onComplete(SmallScanResponse resp) {
-    resultList.addAll(Arrays.asList(resp.results));
-    if (resultList.size() == limit) {
-      future.complete(resultList);
-      return;
-    }
-    if (resp.hasMoreResultsInRegion) {
-      if (resp.results.length > 0) {
-        scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false);
-      }
-      scan();
-      return;
-    }
-    if (!nextScan.apply(resp.currentRegion)) {
-      future.complete(resultList);
-    }
-  }
-
-  private void scan() {
-    conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
-        .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
-        .whenComplete((resp, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-          } else {
-            onComplete(resp);
-          }
-        });
-  }
-
-  public CompletableFuture<List<Result>> call() {
-    scan();
-    return future;
-  }
-
-  private boolean nextScan(HRegionInfo info) {
-    if (noMoreResultsForScan(scan, info)) {
-      return false;
-    } else {
-      scan.withStartRow(info.getEndKey());
-      scan();
-      return true;
-    }
-  }
-
-  private boolean reversedNextScan(HRegionInfo info) {
-    if (noMoreResultsForReverseScan(scan, info)) {
-      return false;
-    } else {
-      scan.withStartRow(info.getStartKey(), false);
-      scan();
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index d82fa22..e201ab2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -300,26 +300,34 @@ public interface AsyncTableBase {
       CompareOp compareOp, byte[] value, RowMutations mutation);
 
   /**
-   * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
-   * @see #smallScan(Scan, int)
-   */
-  default CompletableFuture<List<Result>> smallScan(Scan scan) {
-    return smallScan(scan, Integer.MAX_VALUE);
-  }
-
-  /**
-   * Return all the results that match the given scan object. The number of the returned results
-   * will not be greater than {@code limit}.
+   * Return all the results that match the given scan object.
+   * <p>
+   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
+   * For example, if you want to get the closest row after a given row, you could do this:
+   * <p>
+   *
+   * <pre>
+   * <code>
+   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
+   *   if (results.isEmpty()) {
+   *      System.out.println("No row after " + Bytes.toStringBinary(row));
+   *   } else {
+   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
+   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
+   *   }
+   * });
+   * </code>
+   * </pre>
    * <p>
-   * Notice that the scan must be small, and should not use batch or allowPartialResults. The
-   * {@code caching} property of the scan object is also ignored as we will use {@code limit}
-   * instead.
-   * @param scan A configured {@link Scan} object.
-   * @param limit the limit of results count
+   * If your result set is very large, you should use other scan method to get a scanner or use
+   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
+   * fetch all the results and store them in a List and then return the list to you.
+   * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
+   *          result set, it is likely to cause OOM.
    * @return The results of this small scan operation. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
-  CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+  CompletableFuture<List<Result>> scanAll(Scan scan);
 
   /**
    * Test for the existence of columns in the table, as specified by the Gets.

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 7cd257c..f1625ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -144,8 +144,8 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
-    return wrap(rawTable.smallScan(scan, limit));
+  public CompletableFuture<List<Result>> scanAll(Scan scan) {
+    return wrap(rawTable.scanAll(scan));
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -197,4 +197,5 @@ class AsyncTableImpl implements AsyncTable {
   public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
     return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d9d2d35..87323ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -21,13 +21,13 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements RawAsyncTable {
 
-  private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class);
-
   private final AsyncConnectionImpl conn;
 
   private final TableName tableName;
@@ -332,12 +330,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
         .call();
   }
 
-  private <T> CompletableFuture<T> failedFuture(Throwable error) {
-    CompletableFuture<T> future = new CompletableFuture<>();
-    future.completeExceptionally(error);
-    return future;
-  }
-
   private Scan setDefaultScanConfig(Scan scan) {
     // always create a new scan object as we may reset the start row later.
     Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
@@ -351,27 +343,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   @Override
-  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
-    if (!scan.isSmall()) {
-      return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
-    }
-    if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-      return failedFuture(
-        new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-    }
-    return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
-        .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+  public CompletableFuture<List<Result>> scanAll(Scan scan) {
+    CompletableFuture<List<Result>> future = new CompletableFuture<>();
+    List<Result> scanResults = new ArrayList<>();
+    scan(scan, new RawScanResultConsumer() {
+
+      @Override
+      public boolean onNext(Result[] results) {
+        scanResults.addAll(Arrays.asList(results));
+        return true;
+      }
+
+      @Override
+      public void onError(Throwable error) {
+        future.completeExceptionally(error);
+      }
+
+      @Override
+      public void onComplete() {
+        future.complete(scanResults);
+      }
+    });
+    return future;
   }
 
   public void scan(Scan scan, RawScanResultConsumer consumer) {
-    if (scan.isSmall()) {
+    if (scan.isSmall() || scan.getLimit() > 0) {
       if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-        consumer.onError(
-          new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-      } else {
-        LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
+        consumer.onError(new IllegalArgumentException(
+            "Batch and allowPartial is not allowed for small scan or limited scan"));
       }
     }
     scan = setDefaultScanConfig(scan);
@@ -388,6 +388,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   public List<CompletableFuture<Void>> put(List<Put> puts) {
     return voidMutate(puts);
   }
+
   @Override
   public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
     return voidMutate(deletes);
@@ -434,4 +435,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
   public long getScanTimeout(TimeUnit unit) {
     return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 7f0514c..2e5d422 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -47,7 +47,9 @@ public interface RawScanResultConsumer {
    * This method give you a chance to terminate a slow scan operation.
    * @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
    */
-  boolean onHeartbeat();
+  default boolean onHeartbeat() {
+    return true;
+  }
 
   /**
    * Indicate that we hit an unrecoverable error and the scan operation is terminated.

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 8d53b9a..31e76da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -46,38 +46,45 @@ import org.apache.hadoop.hbase.util.Bytes;
 /**
  * Used to perform Scan operations.
  * <p>
- * All operations are identical to {@link Get} with the exception of
- * instantiation.  Rather than specifying a single row, an optional startRow
- * and stopRow may be defined.  If rows are not specified, the Scanner will
- * iterate over all rows.
+ * All operations are identical to {@link Get} with the exception of instantiation. Rather than
+ * specifying a single row, an optional startRow and stopRow may be defined. If rows are not
+ * specified, the Scanner will iterate over all rows.
  * <p>
  * To get all columns from all rows of a Table, create an instance with no constraints; use the
- * {@link #Scan()} constructor. To constrain the scan to specific column families,
- * call {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
+ * {@link #Scan()} constructor. To constrain the scan to specific column families, call
+ * {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
  * <p>
- * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn}
- * for each column to retrieve.
+ * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn} for each column to
+ * retrieve.
  * <p>
- * To only retrieve columns within a specific range of version timestamps,
- * call {@link #setTimeRange(long, long) setTimeRange}.
+ * To only retrieve columns within a specific range of version timestamps, call
+ * {@link #setTimeRange(long, long) setTimeRange}.
  * <p>
- * To only retrieve columns with a specific timestamp, call
- * {@link #setTimeStamp(long) setTimestamp}.
+ * To only retrieve columns with a specific timestamp, call {@link #setTimeStamp(long) setTimestamp}
+ * .
  * <p>
- * To limit the number of versions of each column to be returned, call
- * {@link #setMaxVersions(int) setMaxVersions}.
+ * To limit the number of versions of each column to be returned, call {@link #setMaxVersions(int)
+ * setMaxVersions}.
  * <p>
- * To limit the maximum number of values returned for each call to next(),
- * call {@link #setBatch(int) setBatch}.
+ * To limit the maximum number of values returned for each call to next(), call
+ * {@link #setBatch(int) setBatch}.
  * <p>
  * To add a filter, call {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
  * <p>
- * Expert: To explicitly disable server-side block caching for this scan,
- * execute {@link #setCacheBlocks(boolean)}.
- * <p><em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan
- * runs and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when
- * you go to clone a Scan instance or if you go to reuse a created Scan instance; safer is create
- * a Scan instance per usage.
+ * For small scan, it is deprecated in 2.0.0. Now we have a {@link #setLimit(int)} method in Scan
+ * object which is used to tell RS how many rows we want. If the rows return reaches the limit, the
+ * RS will close the RegionScanner automatically. And we will also fetch data when openScanner in
+ * the new implementation, this means we can also finish a scan operation in one rpc call. And we
+ * have also introduced a {@link #setReadType(ReadType)} method. You can use this method to tell RS
+ * to use pread explicitly.
+ * <p>
+ * Expert: To explicitly disable server-side block caching for this scan, execute
+ * {@link #setCacheBlocks(boolean)}.
+ * <p>
+ * <em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan runs
+ * and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when you go to
+ * clone a Scan instance or if you go to reuse a created Scan instance; safer is create a Scan
+ * instance per usage.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -86,9 +93,9 @@ public class Scan extends Query {
 
   private static final String RAW_ATTR = "_raw_";
 
-  private byte [] startRow = HConstants.EMPTY_START_ROW;
+  private byte[] startRow = HConstants.EMPTY_START_ROW;
   private boolean includeStartRow = true;
-  private byte [] stopRow  = HConstants.EMPTY_END_ROW;
+  private byte[] stopRow  = HConstants.EMPTY_END_ROW;
   private boolean includeStopRow = false;
   private int maxVersions = 1;
   private int batch = -1;
@@ -172,6 +179,16 @@ public class Scan extends Query {
   private long mvccReadPoint = -1L;
 
   /**
+   * The number of rows we want for this scan. We will terminate the scan if the number of return
+   * rows reaches this value.
+   */
+  private int limit = -1;
+
+  /**
+   * Control whether to use pread at server side.
+   */
+  private ReadType readType = ReadType.DEFAULT;
+  /**
    * Create a Scan operation across all rows.
    */
   public Scan() {}
@@ -257,6 +274,7 @@ public class Scan extends Query {
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
     this.mvccReadPoint = scan.getMvccReadPoint();
+    this.limit = scan.getLimit();
   }
 
   /**
@@ -969,37 +987,36 @@ public class Scan extends Query {
     return attr == null ? false : Bytes.toBoolean(attr);
   }
 
-
-
   /**
    * Set whether this scan is a small scan
    * <p>
-   * Small scan should use pread and big scan can use seek + read
-   *
-   * seek + read is fast but can cause two problem (1) resource contention (2)
-   * cause too much network io
-   *
-   * [89-fb] Using pread for non-compaction read request
-   * https://issues.apache.org/jira/browse/HBASE-7266
-   *
-   * On the other hand, if setting it true, we would do
-   * openScanner,next,closeScanner in one RPC call. It means the better
-   * performance for small scan. [HBASE-9488].
-   *
-   * Generally, if the scan range is within one data block(64KB), it could be
-   * considered as a small scan.
-   *
+   * Small scan should use pread and big scan can use seek + read seek + read is fast but can cause
+   * two problem (1) resource contention (2) cause too much network io [89-fb] Using pread for
+   * non-compaction read request https://issues.apache.org/jira/browse/HBASE-7266 On the other hand,
+   * if setting it true, we would do openScanner,next,closeScanner in one RPC call. It means the
+   * better performance for small scan. [HBASE-9488]. Generally, if the scan range is within one
+   * data block(64KB), it could be considered as a small scan.
    * @param small
+   * @deprecated since 2.0.0. Use {@link #setLimit(int)} and {@link #setReadType(ReadType)} instead.
+   *             And for the one rpc optimization, now we will also fetch data when openScanner, and
+   *             if the number of rows reaches the limit then we will close the scanner
+   *             automatically which means we will fall back to one rpc.
+   * @see #setLimit(int)
+   * @see #setReadType(ReadType)
    */
+  @Deprecated
   public Scan setSmall(boolean small) {
     this.small = small;
+    this.readType = ReadType.PREAD;
     return this;
   }
 
   /**
    * Get whether this scan is a small scan
    * @return true if small scan
+   * @deprecated since 2.0.0. See the comment of {@link #setSmall(boolean)}
    */
+  @Deprecated
   public boolean isSmall() {
     return small;
   }
@@ -1081,6 +1098,53 @@ public class Scan extends Query {
   }
 
   /**
+   * @return the limit of rows for this scan
+   */
+  public int getLimit() {
+    return limit;
+  }
+
+  /**
+   * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows
+   * reaches this value.
+   * <p>
+   * This condition will be tested at last, after all other conditions such as stopRow, filter, etc.
+   * <p>
+   * Can not be used together with batch and allowPartial.
+   * @param limit the limit of rows for this scan
+   * @return this
+   */
+  public Scan setLimit(int limit) {
+    this.limit = limit;
+    return this;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public enum ReadType {
+    DEFAULT, STREAM, PREAD
+  }
+
+  /**
+   * @return the read type for this scan
+   */
+  public ReadType getReadType() {
+    return readType;
+  }
+
+  /**
+   * Set the read type for this scan.
+   * <p>
+   * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For
+   * example, we will always use pread if this is a get scan.
+   * @return this
+   */
+  public Scan setReadType(ReadType readType) {
+    this.readType = readType;
+    return this;
+  }
+
+  /**
    * Get the mvcc read point used to open a scanner.
    */
   long getMvccReadPoint() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 642fae0..f867acb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -194,7 +194,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
         try {
           incRPCcallsMetrics();
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
-                this.scanMetrics != null, renew);
+            this.scanMetrics != null, renew, -1);
           ScanResponse response = null;
           response = getStub().scan(getRpcController(), request);
           // Client and RS maintain a nextCallSeq number during the scan. Every next() call

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index d3898d4..51a94ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -814,6 +814,32 @@ public final class ProtobufUtil {
     return get;
   }
 
+  public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return ClientProtos.Scan.ReadType.DEFAULT;
+      case STREAM:
+        return ClientProtos.Scan.ReadType.STREAM;
+      case PREAD:
+        return ClientProtos.Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
+  public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return Scan.ReadType.DEFAULT;
+      case STREAM:
+        return Scan.ReadType.STREAM;
+      case PREAD:
+        return Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
   /**
    * Convert a client Scan to a protocol buffer Scan
    *
@@ -917,6 +943,9 @@ public final class ProtobufUtil {
     if (scan.includeStopRow()) {
       scanBuilder.setIncludeStopRow(true);
     }
+    if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+      scanBuilder.setReadType(toReadType(scan.getReadType()));
+    }
     return scanBuilder.build();
   }
 
@@ -1015,6 +1044,11 @@ public final class ProtobufUtil {
     if (proto.hasMvccReadPoint()) {
       PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
     }
+    if (scan.isSmall()) {
+      scan.setReadType(Scan.ReadType.PREAD);
+    } else if (proto.hasReadType()) {
+      scan.setReadType(toReadType(proto.getReadType()));
+    }
     return scan;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 7764f65..13ff92e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLoadStats;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -928,6 +929,32 @@ public final class ProtobufUtil {
     return get;
   }
 
+  public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return ClientProtos.Scan.ReadType.DEFAULT;
+      case STREAM:
+        return ClientProtos.Scan.ReadType.STREAM;
+      case PREAD:
+        return ClientProtos.Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
+  public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+    switch (readType) {
+      case DEFAULT:
+        return Scan.ReadType.DEFAULT;
+      case STREAM:
+        return Scan.ReadType.STREAM;
+      case PREAD:
+        return Scan.ReadType.PREAD;
+      default:
+        throw new IllegalArgumentException("Unknown ReadType: " + readType);
+    }
+  }
+
   /**
    * Convert a client Scan to a protocol buffer Scan
    *
@@ -1031,6 +1058,9 @@ public final class ProtobufUtil {
     if (scan.includeStopRow()) {
       scanBuilder.setIncludeStopRow(true);
     }
+    if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+      scanBuilder.setReadType(toReadType(scan.getReadType()));
+    }
     return scanBuilder.build();
   }
 
@@ -1129,6 +1159,11 @@ public final class ProtobufUtil {
     if (proto.hasMvccReadPoint()) {
       PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
     }
+    if (scan.isSmall()) {
+      scan.setReadType(Scan.ReadType.PREAD);
+    } else if (proto.hasReadType()) {
+      scan.setReadType(toReadType(proto.getReadType()));
+    }
     return scan;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index fd08d98..8de9ad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -462,11 +462,10 @@ public final class RequestConverter {
    * @return a scan request
    * @throws IOException
    */
-  public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
-      final int numberOfRows, final boolean closeScanner) throws IOException {
+  public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows,
+      boolean closeScanner) throws IOException {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
+    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
     builder.setRegion(region);
@@ -474,19 +473,21 @@ public final class RequestConverter {
     builder.setClientHandlesPartials(true);
     builder.setClientHandlesHeartbeats(true);
     builder.setTrackScanMetrics(scan.isScanMetricsEnabled());
+    if (scan.getLimit() > 0) {
+      builder.setLimitOfRows(scan.getLimit());
+    }
     return builder.build();
   }
 
   /**
    * Create a protocol buffer ScanRequest for a scanner id
-   *
    * @param scannerId
    * @param numberOfRows
    * @param closeScanner
    * @return a scan request
    */
-  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
-      final boolean closeScanner, final boolean trackMetrics) {
+  public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+      boolean trackMetrics) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
@@ -499,16 +500,14 @@ public final class RequestConverter {
 
   /**
    * Create a protocol buffer ScanRequest for a scanner id
-   *
    * @param scannerId
    * @param numberOfRows
    * @param closeScanner
    * @param nextCallSeq
    * @return a scan request
    */
-  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
-      final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics,
-      final boolean renew) {
+  public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+      long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
@@ -518,6 +517,9 @@ public final class RequestConverter {
     builder.setClientHandlesHeartbeats(true);
     builder.setTrackScanMetrics(trackMetrics);
     builder.setRenew(renew);
+    if (limitOfRows > 0) {
+      builder.setLimitOfRows(limitOfRows);
+    }
     return builder.build();
   }
 


[2/3] hbase git commit: HBASE-17045 Unify the implementation of small scan and regular scan

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
index ef44295..a6f0f43 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
@@ -14487,13 +14487,13 @@ public final class ClientProtos {
     boolean getLoadColumnFamiliesOnDemand();
 
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    boolean hasSmall();
+    @java.lang.Deprecated boolean hasSmall();
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    boolean getSmall();
+    @java.lang.Deprecated boolean getSmall();
 
     /**
      * <code>optional bool reversed = 15 [default = false];</code>
@@ -14581,6 +14581,15 @@ public final class ClientProtos {
      * <code>optional bool include_stop_row = 22 [default = false];</code>
      */
     boolean getIncludeStopRow();
+
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    boolean hasReadType();
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType();
   }
   /**
    * <pre>
@@ -14624,6 +14633,7 @@ public final class ClientProtos {
       mvccReadPoint_ = 0L;
       includeStartRow_ = true;
       includeStopRow_ = false;
+      readType_ = 0;
     }
 
     @java.lang.Override
@@ -14798,6 +14808,17 @@ public final class ClientProtos {
               includeStopRow_ = input.readBool();
               break;
             }
+            case 184: {
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(23, rawValue);
+              } else {
+                bitField0_ |= 0x00080000;
+                readType_ = rawValue;
+              }
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14831,6 +14852,105 @@ public final class ClientProtos {
               org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.Builder.class);
     }
 
+    /**
+     * Protobuf enum {@code hbase.pb.Scan.ReadType}
+     */
+    public enum ReadType
+        implements org.apache.hadoop.hbase.shaded.com.google.protobuf.ProtocolMessageEnum {
+      /**
+       * <code>DEFAULT = 0;</code>
+       */
+      DEFAULT(0),
+      /**
+       * <code>STREAM = 1;</code>
+       */
+      STREAM(1),
+      /**
+       * <code>PREAD = 2;</code>
+       */
+      PREAD(2),
+      ;
+
+      /**
+       * <code>DEFAULT = 0;</code>
+       */
+      public static final int DEFAULT_VALUE = 0;
+      /**
+       * <code>STREAM = 1;</code>
+       */
+      public static final int STREAM_VALUE = 1;
+      /**
+       * <code>PREAD = 2;</code>
+       */
+      public static final int PREAD_VALUE = 2;
+
+
+      public final int getNumber() {
+        return value;
+      }
+
+      /**
+       * @deprecated Use {@link #forNumber(int)} instead.
+       */
+      @java.lang.Deprecated
+      public static ReadType valueOf(int value) {
+        return forNumber(value);
+      }
+
+      public static ReadType forNumber(int value) {
+        switch (value) {
+          case 0: return DEFAULT;
+          case 1: return STREAM;
+          case 2: return PREAD;
+          default: return null;
+        }
+      }
+
+      public static org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<ReadType>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<
+          ReadType> internalValueMap =
+            new org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<ReadType>() {
+              public ReadType findValueByNumber(int number) {
+                return ReadType.forNumber(number);
+              }
+            };
+
+      public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor
+          getValueDescriptor() {
+        return getDescriptor().getValues().get(ordinal());
+      }
+      public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptorForType() {
+        return getDescriptor();
+      }
+      public static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0);
+      }
+
+      private static final ReadType[] VALUES = values();
+
+      public static ReadType valueOf(
+          org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+        if (desc.getType() != getDescriptor()) {
+          throw new java.lang.IllegalArgumentException(
+            "EnumValueDescriptor is not for this type.");
+        }
+        return VALUES[desc.getIndex()];
+      }
+
+      private final int value;
+
+      private ReadType(int value) {
+        this.value = value;
+      }
+
+      // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType)
+    }
+
     private int bitField0_;
     public static final int COLUMN_FIELD_NUMBER = 1;
     private java.util.List<org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column> column_;
@@ -15090,15 +15210,15 @@ public final class ClientProtos {
     public static final int SMALL_FIELD_NUMBER = 14;
     private boolean small_;
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    public boolean hasSmall() {
+    @java.lang.Deprecated public boolean hasSmall() {
       return ((bitField0_ & 0x00000800) == 0x00000800);
     }
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    public boolean getSmall() {
+    @java.lang.Deprecated public boolean getSmall() {
       return small_;
     }
 
@@ -15243,6 +15363,22 @@ public final class ClientProtos {
       return includeStopRow_;
     }
 
+    public static final int READTYPE_FIELD_NUMBER = 23;
+    private int readType_;
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    public boolean hasReadType() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+      return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -15345,6 +15481,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeBool(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeEnum(23, readType_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -15441,6 +15580,10 @@ public final class ClientProtos {
         size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
           .computeBoolSize(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeEnumSize(23, readType_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -15558,6 +15701,10 @@ public final class ClientProtos {
         result = result && (getIncludeStopRow()
             == other.getIncludeStopRow());
       }
+      result = result && (hasReadType() == other.hasReadType());
+      if (hasReadType()) {
+        result = result && readType_ == other.readType_;
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -15666,6 +15813,10 @@ public final class ClientProtos {
         hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean(
             getIncludeStopRow());
       }
+      if (hasReadType()) {
+        hash = (37 * hash) + READTYPE_FIELD_NUMBER;
+        hash = (53 * hash) + readType_;
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -15863,6 +16014,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00100000);
         includeStopRow_ = false;
         bitField0_ = (bitField0_ & ~0x00200000);
+        readType_ = 0;
+        bitField0_ = (bitField0_ & ~0x00400000);
         return this;
       }
 
@@ -15998,6 +16151,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00040000;
         }
         result.includeStopRow_ = includeStopRow_;
+        if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.readType_ = readType_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -16175,6 +16332,9 @@ public final class ClientProtos {
         if (other.hasIncludeStopRow()) {
           setIncludeStopRow(other.getIncludeStopRow());
         }
+        if (other.hasReadType()) {
+          setReadType(other.getReadType());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -17251,30 +17411,30 @@ public final class ClientProtos {
 
       private boolean small_ ;
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public boolean hasSmall() {
+      @java.lang.Deprecated public boolean hasSmall() {
         return ((bitField0_ & 0x00002000) == 0x00002000);
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public boolean getSmall() {
+      @java.lang.Deprecated public boolean getSmall() {
         return small_;
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public Builder setSmall(boolean value) {
+      @java.lang.Deprecated public Builder setSmall(boolean value) {
         bitField0_ |= 0x00002000;
         small_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public Builder clearSmall() {
+      @java.lang.Deprecated public Builder clearSmall() {
         bitField0_ = (bitField0_ & ~0x00002000);
         small_ = false;
         onChanged();
@@ -17748,6 +17908,42 @@ public final class ClientProtos {
         onChanged();
         return this;
       }
+
+      private int readType_ = 0;
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public boolean hasReadType() {
+        return ((bitField0_ & 0x00400000) == 0x00400000);
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+        org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+        return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public Builder setReadType(org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00400000;
+        readType_ = value.getNumber();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public Builder clearReadType() {
+        bitField0_ = (bitField0_ & ~0x00400000);
+        readType_ = 0;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -17898,6 +18094,23 @@ public final class ClientProtos {
      * <code>optional bool renew = 10 [default = false];</code>
      */
     boolean getRenew();
+
+    /**
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     *
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     */
+    boolean hasLimitOfRows();
+    /**
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     *
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     */
+    int getLimitOfRows();
   }
   /**
    * <pre>
@@ -17930,6 +18143,7 @@ public final class ClientProtos {
       clientHandlesHeartbeats_ = false;
       trackScanMetrics_ = false;
       renew_ = false;
+      limitOfRows_ = 0;
     }
 
     @java.lang.Override
@@ -18026,6 +18240,11 @@ public final class ClientProtos {
               renew_ = input.readBool();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              limitOfRows_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18213,6 +18432,29 @@ public final class ClientProtos {
       return renew_;
     }
 
+    public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+    private int limitOfRows_;
+    /**
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     *
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     */
+    public boolean hasLimitOfRows() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    /**
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     *
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     */
+    public int getLimitOfRows() {
+      return limitOfRows_;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -18267,6 +18509,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeBool(10, renew_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeUInt32(11, limitOfRows_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -18315,6 +18560,10 @@ public final class ClientProtos {
         size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
           .computeBoolSize(10, renew_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(11, limitOfRows_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -18382,6 +18631,11 @@ public final class ClientProtos {
         result = result && (getRenew()
             == other.getRenew());
       }
+      result = result && (hasLimitOfRows() == other.hasLimitOfRows());
+      if (hasLimitOfRows()) {
+        result = result && (getLimitOfRows()
+            == other.getLimitOfRows());
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -18440,6 +18694,10 @@ public final class ClientProtos {
         hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean(
             getRenew());
       }
+      if (hasLimitOfRows()) {
+        hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER;
+        hash = (53 * hash) + getLimitOfRows();
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -18599,6 +18857,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000100);
         renew_ = false;
         bitField0_ = (bitField0_ & ~0x00000200);
+        limitOfRows_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
 
@@ -18671,6 +18931,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000200;
         }
         result.renew_ = renew_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.limitOfRows_ = limitOfRows_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -18743,6 +19007,9 @@ public final class ClientProtos {
         if (other.hasRenew()) {
           setRenew(other.getRenew());
         }
+        if (other.hasLimitOfRows()) {
+          setLimitOfRows(other.getLimitOfRows());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -19272,6 +19539,54 @@ public final class ClientProtos {
         onChanged();
         return this;
       }
+
+      private int limitOfRows_ ;
+      /**
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       *
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       */
+      public boolean hasLimitOfRows() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       *
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       */
+      public int getLimitOfRows() {
+        return limitOfRows_;
+      }
+      /**
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       *
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       */
+      public Builder setLimitOfRows(int value) {
+        bitField0_ |= 0x00000400;
+        limitOfRows_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       *
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       */
+      public Builder clearLimitOfRows() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        limitOfRows_ = 0;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -40834,7 +41149,7 @@ public final class ClientProtos {
       "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" +
       "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" +
       "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " +
-      "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
+      "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
       "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" +
       "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" +
       "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" +
@@ -40843,104 +41158,108 @@ public final class ClientProtos {
       "cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" +
       "ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" +
       "\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" +
-      "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" +
-      "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" +
-      " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" +
-      "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " +
-      "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" +
-      "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024",
-      " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" +
-      "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" +
-      "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" +
-      "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" +
-      "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" +
-      "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" +
-      "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" +
-      "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" +
-      "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " +
-      "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per",
-      "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" +
-      "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" +
-      "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" +
-      "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" +
-      "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" +
-      "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" +
-      ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" +
-      ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" +
-      " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" +
-      "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq",
-      "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" +
-      "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" +
-      "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " +
-      "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" +
-      "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" +
-      "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" +
-      "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" +
-      "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" +
-      "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" +
-      ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re",
-      "gionSpecifier\"-\n\027PrepareBulkLoadResponse" +
-      "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" +
-      "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" +
-      "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" +
-      "BulkLoadResponse\"a\n\026CoprocessorServiceCa" +
-      "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" +
-      "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" +
-      "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" +
-      ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" +
-      "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.",
-      "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" +
-      ".CoprocessorServiceCall\"o\n\032CoprocessorSe" +
-      "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
-      ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." +
-      "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" +
-      "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" +
-      "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" +
-      "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" +
-      "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" +
-      "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002",
-      " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" +
-      "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" +
-      "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" +
-      "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" +
-      "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" +
-      "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" +
-      "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" +
-      " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" +
-      "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" +
-      "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co",
-      "processorServiceResult\0220\n\tloadStats\030\005 \001(" +
-      "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" +
-      "onActionResult\0226\n\021resultOrException\030\001 \003(" +
-      "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" +
-      "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" +
-      "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" +
-      ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" +
-      "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" +
-      "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" +
-      "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc",
-      "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." +
-      "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" +
-      "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" +
-      "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." +
-      "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" +
-      "b.MutateRequest\032\030.hbase.pb.MutateRespons" +
-      "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" +
-      ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" +
-      "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" +
-      "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .",
-      "hbase.pb.PrepareBulkLoadRequest\032!.hbase." +
-      "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" +
-      "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" +
-      "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" +
-      "ecService\022#.hbase.pb.CoprocessorServiceR" +
-      "equest\032$.hbase.pb.CoprocessorServiceResp" +
-      "onse\022d\n\027ExecRegionServerService\022#.hbase." +
-      "pb.CoprocessorServiceRequest\032$.hbase.pb." +
-      "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" +
-      "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp",
-      "onseBI\n1org.apache.hadoop.hbase.shaded.p" +
-      "rotobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+      "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" +
+      "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" +
+      "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" +
+      "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" +
+      "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" +
+      "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi",
+      "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" +
+      "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" +
+      "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" +
+      ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" +
+      "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" +
+      "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" +
+      "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" +
+      "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" +
+      "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" +
+      "client_handles_partials\030\007 \001(\010\022!\n\031client_",
+      "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" +
+      "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" +
+      "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" +
+      "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " +
+      "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" +
+      "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" +
+      "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+      "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" +
+      "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" +
+      "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p",
+      "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" +
+      "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" +
+      "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" +
+      "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" +
+      "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" +
+      "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" +
+      "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" +
+      "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" +
+      "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
+      "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001",
+      "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" +
+      "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
+      "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" +
+      "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" +
+      "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" +
+      "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" +
+      "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" +
+      "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" +
+      "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" +
+      "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030",
+      "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" +
+      "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" +
+      "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" +
+      ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" +
+      " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" +
+      "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" +
+      "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" +
+      "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" +
+      "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" +
+      "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.",
+      "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" +
+      "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" +
+      "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" +
+      "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" +
+      "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" +
+      "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" +
+      "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" +
+      "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" +
+      ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" +
+      "b.RegionLoadStats\"\336\001\n\021ResultOrException\022",
+      "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" +
+      "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" +
+      "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" +
+      "base.pb.CoprocessorServiceResult\0220\n\tload" +
+      "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" +
+      "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" +
+      "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" +
+      "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" +
+      "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" +
+      "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup",
+      "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" +
+      "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" +
+      "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" +
+      "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" +
+      "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" +
+      "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" +
+      "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" +
+      "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" +
+      "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" +
+      "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque",
+      "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" +
+      "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" +
+      "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" +
+      "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" +
+      "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" +
+      "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" +
+      "adRequest\032!.hbase.pb.CleanupBulkLoadResp" +
+      "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" +
+      "orServiceRequest\032$.hbase.pb.CoprocessorS" +
+      "erviceResponse\022d\n\027ExecRegionServerServic",
+      "e\022#.hbase.pb.CoprocessorServiceRequest\032$" +
+      ".hbase.pb.CoprocessorServiceResponse\0228\n\005" +
+      "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" +
+      ".MultiResponseBI\n1org.apache.hadoop.hbas" +
+      "e.shaded.protobuf.generatedB\014ClientProto" +
+      "sH\001\210\001\001\240\001\001"
     };
     org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
@@ -41042,13 +41361,13 @@ public final class ClientProtos {
     internal_static_hbase_pb_Scan_fieldAccessorTable = new
       org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
         internal_static_hbase_pb_Scan_descriptor,
-        new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", });
+        new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", });
     internal_static_hbase_pb_ScanRequest_descriptor =
       getDescriptor().getMessageTypes().get(12);
     internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new
       org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
         internal_static_hbase_pb_ScanRequest_descriptor,
-        new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", });
+        new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", });
     internal_static_hbase_pb_ScanResponse_descriptor =
       getDescriptor().getMessageTypes().get(13);
     internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol-shaded/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 2793b89..e5a10b0 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -249,7 +249,7 @@ message Scan {
   optional uint32 store_limit = 11;
   optional uint32 store_offset = 12;
   optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
-  optional bool small = 14;
+  optional bool small = 14 [deprecated = true];
   optional bool reversed = 15 [default = false];
   optional Consistency consistency = 16 [default = STRONG];
   optional uint32 caching = 17;
@@ -258,6 +258,13 @@ message Scan {
   optional uint64 mvcc_read_point = 20 [default = 0];
   optional bool include_start_row = 21 [default = true];
   optional bool include_stop_row = 22 [default = false];
+
+  enum ReadType {
+    DEFAULT = 0;
+    STREAM = 1;
+    PREAD = 2;
+  }
+  optional ReadType readType = 23 [default = DEFAULT];
 }
 
 /**
@@ -282,6 +289,8 @@ message ScanRequest {
   optional bool client_handles_heartbeats = 8;
   optional bool track_scan_metrics = 9;
   optional bool renew = 10 [default = false];
+  // if we have returned limit_of_rows rows to client, then close the scanner.
+  optional uint32 limit_of_rows = 11 [default = 0];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 087576c..a550f85 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -14145,15 +14145,15 @@ public final class ClientProtos {
      */
     boolean getLoadColumnFamiliesOnDemand();
 
-    // optional bool small = 14;
+    // optional bool small = 14 [deprecated = true];
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    boolean hasSmall();
+    @java.lang.Deprecated boolean hasSmall();
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    boolean getSmall();
+    @java.lang.Deprecated boolean getSmall();
 
     // optional bool reversed = 15 [default = false];
     /**
@@ -14249,6 +14249,16 @@ public final class ClientProtos {
      * <code>optional bool include_stop_row = 22 [default = false];</code>
      */
     boolean getIncludeStopRow();
+
+    // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    boolean hasReadType();
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType();
   }
   /**
    * Protobuf type {@code hbase.pb.Scan}
@@ -14453,6 +14463,17 @@ public final class ClientProtos {
               includeStopRow_ = input.readBool();
               break;
             }
+            case 184: {
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(23, rawValue);
+              } else {
+                bitField0_ |= 0x00080000;
+                readType_ = value;
+              }
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14501,6 +14522,97 @@ public final class ClientProtos {
       return PARSER;
     }
 
+    /**
+     * Protobuf enum {@code hbase.pb.Scan.ReadType}
+     */
+    public enum ReadType
+        implements com.google.protobuf.ProtocolMessageEnum {
+      /**
+       * <code>DEFAULT = 0;</code>
+       */
+      DEFAULT(0, 0),
+      /**
+       * <code>STREAM = 1;</code>
+       */
+      STREAM(1, 1),
+      /**
+       * <code>PREAD = 2;</code>
+       */
+      PREAD(2, 2),
+      ;
+
+      /**
+       * <code>DEFAULT = 0;</code>
+       */
+      public static final int DEFAULT_VALUE = 0;
+      /**
+       * <code>STREAM = 1;</code>
+       */
+      public static final int STREAM_VALUE = 1;
+      /**
+       * <code>PREAD = 2;</code>
+       */
+      public static final int PREAD_VALUE = 2;
+
+
+      public final int getNumber() { return value; }
+
+      public static ReadType valueOf(int value) {
+        switch (value) {
+          case 0: return DEFAULT;
+          case 1: return STREAM;
+          case 2: return PREAD;
+          default: return null;
+        }
+      }
+
+      public static com.google.protobuf.Internal.EnumLiteMap<ReadType>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<ReadType>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<ReadType>() {
+              public ReadType findValueByNumber(int number) {
+                return ReadType.valueOf(number);
+              }
+            };
+
+      public final com.google.protobuf.Descriptors.EnumValueDescriptor
+          getValueDescriptor() {
+        return getDescriptor().getValues().get(index);
+      }
+      public final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptorForType() {
+        return getDescriptor();
+      }
+      public static final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0);
+      }
+
+      private static final ReadType[] VALUES = values();
+
+      public static ReadType valueOf(
+          com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+        if (desc.getType() != getDescriptor()) {
+          throw new java.lang.IllegalArgumentException(
+            "EnumValueDescriptor is not for this type.");
+        }
+        return VALUES[desc.getIndex()];
+      }
+
+      private final int index;
+      private final int value;
+
+      private ReadType(int index, int value) {
+        this.index = index;
+        this.value = value;
+      }
+
+      // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType)
+    }
+
     private int bitField0_;
     // repeated .hbase.pb.Column column = 1;
     public static final int COLUMN_FIELD_NUMBER = 1;
@@ -14770,19 +14882,19 @@ public final class ClientProtos {
       return loadColumnFamiliesOnDemand_;
     }
 
-    // optional bool small = 14;
+    // optional bool small = 14 [deprecated = true];
     public static final int SMALL_FIELD_NUMBER = 14;
     private boolean small_;
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    public boolean hasSmall() {
+    @java.lang.Deprecated public boolean hasSmall() {
       return ((bitField0_ & 0x00000800) == 0x00000800);
     }
     /**
-     * <code>optional bool small = 14;</code>
+     * <code>optional bool small = 14 [deprecated = true];</code>
      */
-    public boolean getSmall() {
+    @java.lang.Deprecated public boolean getSmall() {
       return small_;
     }
 
@@ -14934,6 +15046,22 @@ public final class ClientProtos {
       return includeStopRow_;
     }
 
+    // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+    public static final int READTYPE_FIELD_NUMBER = 23;
+    private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_;
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    public boolean hasReadType() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    /**
+     * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+      return readType_;
+    }
+
     private void initFields() {
       column_ = java.util.Collections.emptyList();
       attribute_ = java.util.Collections.emptyList();
@@ -14957,6 +15085,7 @@ public final class ClientProtos {
       mvccReadPoint_ = 0L;
       includeStartRow_ = true;
       includeStopRow_ = false;
+      readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -15060,6 +15189,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeBool(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeEnum(23, readType_.getNumber());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -15157,6 +15289,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(23, readType_.getNumber());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -15281,6 +15417,11 @@ public final class ClientProtos {
         result = result && (getIncludeStopRow()
             == other.getIncludeStopRow());
       }
+      result = result && (hasReadType() == other.hasReadType());
+      if (hasReadType()) {
+        result = result &&
+            (getReadType() == other.getReadType());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -15382,6 +15523,10 @@ public final class ClientProtos {
         hash = (37 * hash) + INCLUDE_STOP_ROW_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getIncludeStopRow());
       }
+      if (hasReadType()) {
+        hash = (37 * hash) + READTYPE_FIELD_NUMBER;
+        hash = (53 * hash) + hashEnum(getReadType());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -15571,6 +15716,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00100000);
         includeStopRow_ = false;
         bitField0_ = (bitField0_ & ~0x00200000);
+        readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+        bitField0_ = (bitField0_ & ~0x00400000);
         return this;
       }
 
@@ -15710,6 +15857,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00040000;
         }
         result.includeStopRow_ = includeStopRow_;
+        if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.readType_ = readType_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -15861,6 +16012,9 @@ public final class ClientProtos {
         if (other.hasIncludeStopRow()) {
           setIncludeStopRow(other.getIncludeStopRow());
         }
+        if (other.hasReadType()) {
+          setReadType(other.getReadType());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -16945,33 +17099,33 @@ public final class ClientProtos {
         return this;
       }
 
-      // optional bool small = 14;
+      // optional bool small = 14 [deprecated = true];
       private boolean small_ ;
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public boolean hasSmall() {
+      @java.lang.Deprecated public boolean hasSmall() {
         return ((bitField0_ & 0x00002000) == 0x00002000);
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public boolean getSmall() {
+      @java.lang.Deprecated public boolean getSmall() {
         return small_;
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public Builder setSmall(boolean value) {
+      @java.lang.Deprecated public Builder setSmall(boolean value) {
         bitField0_ |= 0x00002000;
         small_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional bool small = 14;</code>
+       * <code>optional bool small = 14 [deprecated = true];</code>
        */
-      public Builder clearSmall() {
+      @java.lang.Deprecated public Builder clearSmall() {
         bitField0_ = (bitField0_ & ~0x00002000);
         small_ = false;
         onChanged();
@@ -17452,6 +17606,42 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+      private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public boolean hasReadType() {
+        return ((bitField0_ & 0x00400000) == 0x00400000);
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+        return readType_;
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public Builder setReadType(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00400000;
+        readType_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+       */
+      public Builder clearReadType() {
+        bitField0_ = (bitField0_ & ~0x00400000);
+        readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.Scan)
     }
 
@@ -17573,6 +17763,24 @@ public final class ClientProtos {
      * <code>optional bool renew = 10 [default = false];</code>
      */
     boolean getRenew();
+
+    // optional uint32 limit_of_rows = 11 [default = 0];
+    /**
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     *
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     */
+    boolean hasLimitOfRows();
+    /**
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     *
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     */
+    int getLimitOfRows();
   }
   /**
    * Protobuf type {@code hbase.pb.ScanRequest}
@@ -17704,6 +17912,11 @@ public final class ClientProtos {
               renew_ = input.readBool();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              limitOfRows_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17916,6 +18129,30 @@ public final class ClientProtos {
       return renew_;
     }
 
+    // optional uint32 limit_of_rows = 11 [default = 0];
+    public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+    private int limitOfRows_;
+    /**
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     *
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     */
+    public boolean hasLimitOfRows() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    /**
+     * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+     *
+     * <pre>
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * </pre>
+     */
+    public int getLimitOfRows() {
+      return limitOfRows_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@@ -17927,6 +18164,7 @@ public final class ClientProtos {
       clientHandlesHeartbeats_ = false;
       trackScanMetrics_ = false;
       renew_ = false;
+      limitOfRows_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -17982,6 +18220,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeBool(10, renew_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeUInt32(11, limitOfRows_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -18031,6 +18272,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(10, renew_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(11, limitOfRows_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -18104,6 +18349,11 @@ public final class ClientProtos {
         result = result && (getRenew()
             == other.getRenew());
       }
+      result = result && (hasLimitOfRows() == other.hasLimitOfRows());
+      if (hasLimitOfRows()) {
+        result = result && (getLimitOfRows()
+            == other.getLimitOfRows());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -18157,6 +18407,10 @@ public final class ClientProtos {
         hash = (37 * hash) + RENEW_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getRenew());
       }
+      if (hasLimitOfRows()) {
+        hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER;
+        hash = (53 * hash) + getLimitOfRows();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -18309,6 +18563,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000100);
         renew_ = false;
         bitField0_ = (bitField0_ & ~0x00000200);
+        limitOfRows_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
 
@@ -18385,6 +18641,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000200;
         }
         result.renew_ = renew_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.limitOfRows_ = limitOfRows_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -18431,6 +18691,9 @@ public final class ClientProtos {
         if (other.hasRenew()) {
           setRenew(other.getRenew());
         }
+        if (other.hasLimitOfRows()) {
+          setLimitOfRows(other.getLimitOfRows());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -18968,6 +19231,55 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional uint32 limit_of_rows = 11 [default = 0];
+      private int limitOfRows_ ;
+      /**
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       *
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       */
+      public boolean hasLimitOfRows() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       *
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       */
+      public int getLimitOfRows() {
+        return limitOfRows_;
+      }
+      /**
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       *
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       */
+      public Builder setLimitOfRows(int value) {
+        bitField0_ |= 0x00000400;
+        limitOfRows_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+       *
+       * <pre>
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * </pre>
+       */
+      public Builder clearLimitOfRows() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        limitOfRows_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest)
     }
 
@@ -39912,7 +40224,7 @@ public final class ClientProtos {
       "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" +
       "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" +
       "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " +
-      "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
+      "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
       "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" +
       "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" +
       "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" +
@@ -39921,104 +40233,108 @@ public final class ClientProtos {
       "cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" +
       "ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" +
       "\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" +
-      "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" +
-      "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" +
-      " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" +
-      "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " +
-      "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" +
-      "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024",
-      " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" +
-      "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" +
-      "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" +
-      "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" +
-      "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" +
-      "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" +
-      "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" +
-      "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" +
-      "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " +
-      "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per",
-      "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" +
-      "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" +
-      "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" +
-      "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" +
-      "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" +
-      "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" +
-      ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" +
-      ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" +
-      " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" +
-      "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq",
-      "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" +
-      "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" +
-      "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " +
-      "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" +
-      "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" +
-      "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" +
-      "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" +
-      "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" +
-      "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" +
-      ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re",
-      "gionSpecifier\"-\n\027PrepareBulkLoadResponse" +
-      "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" +
-      "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" +
-      "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" +
-      "BulkLoadResponse\"a\n\026CoprocessorServiceCa" +
-      "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" +
-      "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" +
-      "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" +
-      ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" +
-      "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.",
-      "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" +
-      ".CoprocessorServiceCall\"o\n\032CoprocessorSe" +
-      "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
-      ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." +
-      "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" +
-      "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" +
-      "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" +
-      "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" +
-      "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" +
-      "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002",
-      " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" +
-      "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" +
-      "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" +
-      "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" +
-      "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" +
-      "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" +
-      "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" +
-      " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" +
-      "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" +
-      "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co",
-      "processorServiceResult\0220\n\tloadStats\030\005 \001(" +
-      "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" +
-      "onActionResult\0226\n\021resultOrException\030\001 \003(" +
-      "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" +
-      "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" +
-      "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" +
-      ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" +
-      "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" +
-      "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" +
-      "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc",
-      "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." +
-      "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" +
-      "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" +
-      "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." +
-      "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" +
-      "b.MutateRequest\032\030.hbase.pb.MutateRespons" +
-      "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" +
-      ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" +
-      "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" +
-      "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .",
-      "hbase.pb.PrepareBulkLoadRequest\032!.hbase." +
-      "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" +
-      "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" +
-      "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" +
-      "ecService\022#.hbase.pb.CoprocessorServiceR" +
-      "equest\032$.hbase.pb.CoprocessorServiceResp" +
-      "onse\022d\n\027ExecRegionServerService\022#.hbase." +
-      "pb.CoprocessorServiceRequest\032$.hbase.pb." +
-      "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" +
-      "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp",
-      "onseBB\n*org.apache.hadoop.hbase.protobuf" +
-      ".generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+      "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" +
+      "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" +
+      "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" +
+      "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" +
+      "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" +
+      "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi",
+      "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" +
+      "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" +
+      "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" +
+      ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" +
+      "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" +
+      "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" +
+      "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" +
+      "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" +
+      "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" +
+      "client_handles_partials\030\007 \001(\010\022!\n\031client_",
+      "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" +
+      "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" +
+      "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" +
+      "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " +
+      "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" +
+      "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" +
+      "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+      "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" +
+      "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" +
+      "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p",
+      "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" +
+      "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" +
+      "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" +
+      "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" +
+      "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" +
+      "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" +
+      "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" +
+      "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" +
+      "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
+      "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001",
+      "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" +
+      "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
+      "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" +
+      "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" +
+      "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" +
+      "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" +
+      "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" +
+      "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" +
+      "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" +
+      "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030",
+      "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" +
+      "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" +
+      "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" +
+      ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" +
+      " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" +
+      "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" +
+      "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" +
+      "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" +
+      "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" +
+      "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.",
+      "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" +
+      "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" +
+      "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" +
+      "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" +
+      "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" +
+      "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" +
+      "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" +
+      "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" +
+      ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" +
+      "b.RegionLoadStats\"\336\001\n\021ResultOrException\022",
+      "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" +
+      "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" +
+      "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" +
+      "base.pb.CoprocessorServiceResult\0220\n\tload" +
+      "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" +
+      "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" +
+      "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" +
+      "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" +
+      "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" +
+      "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup",
+      "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" +
+      "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" +
+      "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" +
+      "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" +
+      "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" +
+      "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" +
+      "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" +
+      "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" +
+      "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" +
+      "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque",
+      "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" +
+      "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" +
+      "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" +
+      "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" +
+      "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" +
+      "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" +
+      "adRequest\032!.hbase.pb.CleanupBulkLoadResp" +
+      "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" +
+      "orServiceRequest\032$.hbase.pb.CoprocessorS" +
+      "erviceResponse\022d\n\027ExecRegionServerServic",
+      "e\022#.hbase.pb.CoprocessorServiceRequest\032$" +
+      ".hbase.pb.CoprocessorServiceResponse\0228\n\005" +
+      "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" +
+      ".MultiResponseBB\n*org.apache.hadoop.hbas" +
+      "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
+      "\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -40108,13 +40424,13 @@ public final class ClientProtos {
           internal_static_hbase_pb_Scan_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_Scan_descriptor,
-              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", });
+              new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", });
           internal_static_hbase_pb_ScanRequest_descriptor =
             getDescriptor().getMessageTypes().get(12);
           internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ScanRequest_descriptor,
-              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", });
+              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", });
           internal_static_hbase_pb_ScanResponse_descriptor =
             getDescriptor().getMessageTypes().get(13);
           internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index ae932f7..5cf66c2 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -249,7 +249,7 @@ message Scan {
   optional uint32 store_limit = 11;
   optional uint32 store_offset = 12;
   optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
-  optional bool small = 14;
+  optional bool small = 14 [deprecated = true];
   optional bool reversed = 15 [default = false];
   optional Consistency consistency = 16 [default = STRONG];
   optional uint32 caching = 17;
@@ -258,6 +258,13 @@ message Scan {
   optional uint64 mvcc_read_point = 20 [default = 0];
   optional bool include_start_row = 21 [default = true];
   optional bool include_stop_row = 22 [default = false];
+
+  enum ReadType {
+    DEFAULT = 0;
+    STREAM = 1;
+    PREAD = 2;
+  }
+  optional ReadType readType = 23 [default = DEFAULT];
 }
 
 /**
@@ -282,6 +289,8 @@ message ScanRequest {
   optional bool client_handles_heartbeats = 8;
   optional bool track_scan_metrics = 9;
   optional bool renew = 10 [default = false];
+  // if we have returned limit_of_rows rows to client, then close the scanner.
+  optional uint32 limit_of_rows = 11 [default = 0];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a072dce..9847dfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1133,6 +1133,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  @VisibleForTesting
+  public int getScannersCount() {
+    return scanners.size();
+  }
+
   public
   RegionScanner getScanner(long scannerId) {
     String scannerIdString = Long.toString(scannerId);
@@ -3014,6 +3019,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     RegionScanner scanner = rsh.s;
     boolean moreResults = true;
     boolean moreResultsInRegion = true;
+    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
+    // close the scanner.
+    int limitOfRows;
+    if (request.hasLimitOfRows()) {
+      limitOfRows = request.getLimitOfRows();
+      rows = Math.min(rows, limitOfRows);
+    } else {
+      limitOfRows = -1;
+    }
     MutableObject lastBlock = new MutableObject();
     boolean scannerClosed = false;
     try {
@@ -3046,6 +3060,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // with the old scan implementation where we just ignore the returned results if moreResults
         // is false. Can remove the isEmpty check after we get rid of the old implementation.
         moreResults = false;
+      } else if (limitOfRows > 0 && results.size() >= limitOfRows
+          && !results.get(results.size() - 1).isPartial()) {
+        // if we have reached the limit of rows
+        moreResults = false;
       }
       addResults(builder, results, (HBaseRpcController) controller,
         RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 7e08eca..8c48aef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -127,7 +127,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected Cell lastTop = null;
 
   // A flag whether use pread for scan
-  private boolean scanUsePread = false;
+  private final boolean scanUsePread;
   // Indicates whether there was flush during the course of the scan
   protected volatile boolean flushed = false;
   // generally we get one file from a flush
@@ -168,7 +168,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
 
      this.maxRowSize = scanInfo.getTableMaxRowSize();
-     this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
+    if (get) {
+      this.scanUsePread = true;
+    } else {
+      switch (scan.getReadType()) {
+        case STREAM:
+          this.scanUsePread = false;
+          break;
+        case PREAD:
+          this.scanUsePread = true;
+          break;
+        default:
+          this.scanUsePread = scanInfo.isUsePread();
+          break;
+      }
+    }
      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
      // Parallel seeking is on if the config allows and more there is more than one store file.
      if (this.store != null && this.store.getStorefilesCount() > 1) {
@@ -348,10 +362,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * @return list of scanners to seek
    */
   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
-    final boolean isCompaction = false;
-    boolean usePread = get || scanUsePread;
-    return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
-        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
+    return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher,
+      scan.getStartRow(), scan.getStopRow(), this.readPt));
   }
 
   /**
@@ -803,18 +815,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   protected void resetScannerStack(Cell lastTopKey) throws IOException {
-    /* When we have the scan object, should we not pass it to getScanners()
-     * to get a limited set of scanners? We did so in the constructor and we
-     * could have done it now by storing the scan object from the constructor
-     */
-
-    final boolean isCompaction = false;
-    boolean usePread = get || scanUsePread;
+    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
+    // scanners? We did so in the constructor and we could have done it now by storing the scan
+    // object from the constructor
     List<KeyValueScanner> scanners = null;
     try {
       flushLock.lock();
-      scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
-        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+      scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
+        scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
       // Clear the current set of flushed store files so that they don't get added again
       flushedStoreFiles.clear();
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index a1d926d..b80efae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -95,6 +95,12 @@ public abstract class AbstractTestAsyncTableScan {
   @Test
   public void testScanAll() throws Exception {
     List<Result> results = doScan(createScan());
+    // make sure all scanners are closed at RS side
+    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+        .forEach(rs -> assertEquals(
+          "The scanner count of " + rs.getServerName() + " is "
+              + rs.getRSRpcServices().getScannersCount(),
+          0, rs.getRSRpcServices().getScannersCount()));
     assertEquals(COUNT, results.size());
     IntStream.range(0, COUNT).forEach(i -> {
       Result result = results.get(i);