You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/25 01:53:48 UTC
[1/3] hbase git commit: HBASE-17045 Unify the implementation of small
scan and regular scan
Repository: hbase
Updated Branches:
refs/heads/master 616f4801b -> 85d701892
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
new file mode 100644
index 0000000..a9a3e43
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.client.Scan.ReadType;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
+
+ @Parameter(0)
+ public String tableType;
+
+ @Parameter(1)
+ public Supplier<AsyncTableBase> getTable;
+
+ @Parameter(2)
+ public String scanType;
+
+ @Parameter(3)
+ public Supplier<Scan> scanCreator;
+
+ private static RawAsyncTable getRawTable() {
+ return ASYNC_CONN.getRawTable(TABLE_NAME);
+ }
+
+ private static AsyncTable getTable() {
+ return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+ }
+
+ private static Scan createNormalScan() {
+ return new Scan();
+ }
+
+ // test if we can handle partial result when open scanner.
+ private static Scan createSmallResultSizeScan() {
+ return new Scan().setMaxResultSize(1);
+ }
+
+ @Parameters(name = "{index}: table={0}, scan={2}")
+ public static List<Object[]> params() {
+ Supplier<AsyncTableBase> rawTable = TestAsyncTableScanAll::getRawTable;
+ Supplier<AsyncTableBase> normalTable = TestAsyncTableScanAll::getTable;
+ Supplier<Scan> normalScan = TestAsyncTableScanAll::createNormalScan;
+ Supplier<Scan> smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan;
+ return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan },
+ new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan },
+ new Object[] { "normal", normalTable, "normal", normalScan },
+ new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan });
+ }
+
+ @Test
+ public void testScanWithLimit() throws InterruptedException, ExecutionException {
+ int start = 111;
+ int stop = 888;
+ int limit = 300;
+ List<Result> results = getTable.get()
+ .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
+ .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
+ .setReadType(ReadType.PREAD))
+ .get();
+ assertEquals(limit, results.size());
+ IntStream.range(0, limit).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start + i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+ });
+ }
+
+ @Test
+ public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
+ int start = 888;
+ int stop = 111;
+ int limit = 300;
+ List<Result> results = getTable.get()
+ .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
+ .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
+ .setReadType(ReadType.PREAD).setReversed(true))
+ .get();
+ assertEquals(limit, results.size());
+ IntStream.range(0, limit).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start - i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+ });
+ }
+
+ @Override
+ protected Scan createScan() {
+ return scanCreator.get();
+ }
+
+ @Override
+ protected List<Result> doScan(Scan scan) throws Exception {
+ return getTable.get().scanAll(scan).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
deleted file mode 100644
index 3737af2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
-
- @Parameter
- public Supplier<AsyncTableBase> getTable;
-
- private static RawAsyncTable getRawTable() {
- return ASYNC_CONN.getRawTable(TABLE_NAME);
- }
-
- private static AsyncTable getTable() {
- return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
- }
-
- @Parameters
- public static List<Object[]> params() {
- return Arrays.asList(new Supplier<?>[] { TestAsyncTableSmallScan::getRawTable },
- new Supplier<?>[] { TestAsyncTableSmallScan::getTable });
- }
-
- @Test
- public void testScanWithLimit() throws InterruptedException, ExecutionException {
- AsyncTableBase table = getTable.get();
- int start = 111;
- int stop = 888;
- int limit = 300;
- List<Result> results =
- table
- .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
- limit)
- .get();
- assertEquals(limit, results.size());
- IntStream.range(0, limit).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start + i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
- });
- }
-
- @Test
- public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
- AsyncTableBase table = getTable.get();
- int start = 888;
- int stop = 111;
- int limit = 300;
- List<Result> results = table.smallScan(
- new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
- limit).get();
- assertEquals(limit, results.size());
- IntStream.range(0, limit).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start - i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
- });
- }
-
- @Override
- protected Scan createScan() {
- return new Scan().setSmall(true);
- }
-
- @Override
- protected List<Result> doScan(Scan scan) throws Exception {
- return getTable.get().smallScan(scan).get();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 9f3970b..a8ef353 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -57,11 +57,6 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
}
@Override
- public boolean onHeartbeat() {
- return true;
- }
-
- @Override
public synchronized void onError(Throwable error) {
finished = true;
this.error = error;
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
index 3c5fe27..1b3c4e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@@ -369,7 +370,10 @@ public class TestMultiRowRangeFilter {
@Test
public void testMultiRowRangeFilterWithExclusive() throws IOException {
tableName = TableName.valueOf("testMultiRowRangeFilterWithExclusive");
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
Table ht = TEST_UTIL.createTable(tableName, family, Integer.MAX_VALUE);
+ ht.setReadRpcTimeout(600000);
+ ht.setOperationTimeout(6000000);
generateRows(numRows, ht, family, qf, value);
Scan scan = new Scan();
[3/3] hbase git commit: HBASE-17045 Unify the implementation of small
scan and regular scan
Posted by zh...@apache.org.
HBASE-17045 Unify the implementation of small scan and regular scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85d70189
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85d70189
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85d70189
Branch: refs/heads/master
Commit: 85d701892ed969380a8bcca9c9f4e306c74af941
Parents: 616f480
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 24 21:07:25 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Jan 25 09:53:06 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncClientScanner.java | 23 +-
.../hbase/client/AsyncNonMetaRegionLocator.java | 4 +-
.../client/AsyncRpcRetryingCallerFactory.java | 84 +--
.../AsyncScanSingleRegionRpcRetryingCaller.java | 49 +-
.../client/AsyncSmallScanRpcRetryingCaller.java | 194 -------
.../hadoop/hbase/client/AsyncTableBase.java | 40 +-
.../hadoop/hbase/client/AsyncTableImpl.java | 5 +-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 56 +-
.../hbase/client/RawScanResultConsumer.java | 4 +-
.../org/apache/hadoop/hbase/client/Scan.java | 146 +++--
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hadoop/hbase/protobuf/ProtobufUtil.java | 34 ++
.../hbase/shaded/protobuf/ProtobufUtil.java | 35 ++
.../hbase/shaded/protobuf/RequestConverter.java | 24 +-
.../shaded/protobuf/generated/ClientProtos.java | 553 ++++++++++++++----
.../src/main/protobuf/Client.proto | 11 +-
.../hbase/protobuf/generated/ClientProtos.java | 556 +++++++++++++++----
hbase-protocol/src/main/protobuf/Client.proto | 11 +-
.../hbase/regionserver/RSRpcServices.java | 18 +
.../hadoop/hbase/regionserver/StoreScanner.java | 38 +-
.../client/AbstractTestAsyncTableScan.java | 6 +
.../hbase/client/TestAsyncTableScanAll.java | 132 +++++
.../hbase/client/TestAsyncTableSmallScan.java | 109 ----
.../hbase/client/TestRawAsyncTableScan.java | 5 -
.../hbase/filter/TestMultiRowRangeFilter.java | 4 +
25 files changed, 1385 insertions(+), 758 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index f656a6c..b9fd34f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
/**
* The asynchronous client scanner implementation.
@@ -95,12 +96,16 @@ class AsyncClientScanner {
public final ClientService.Interface stub;
- public final long scannerId;
+ public final HBaseRpcController controller;
- public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) {
+ public final ScanResponse resp;
+
+ public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
+ ScanResponse resp) {
this.loc = loc;
this.stub = stub;
- this.scannerId = scannerId;
+ this.controller = controller;
+ this.resp = resp;
}
}
@@ -108,14 +113,14 @@ class AsyncClientScanner {
HRegionLocation loc, ClientService.Interface stub) {
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
- ScanRequest request =
- RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false);
+ ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
+ scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
- future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId()));
+ future.complete(new OpenScannerResponse(loc, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
@@ -124,11 +129,11 @@ class AsyncClientScanner {
}
private void startScan(OpenScannerResponse resp) {
- conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
- .setScan(scan).consumer(consumer).resultCache(resultCache)
+ conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+ .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp)
.whenComplete((hasMore, error) -> {
if (error != null) {
consumer.onError(error);
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index ae79b65..2c8669f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -376,7 +377,8 @@ class AsyncNonMetaRegionLocator {
metaKey = createRegionName(tableName, req.row, NINES, false);
}
conn.getRawTable(META_TABLE_NAME)
- .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+ .scanAll(new Scan().withStartRow(metaKey).setReversed(true).setReadType(ReadType.PREAD)
+ .addFamily(CATALOG_FAMILY).setLimit(1))
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 5df66cc..6bc2cc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -30,7 +30,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
/**
* Factory to create an AsyncRpcRetryCaller.
@@ -138,81 +140,6 @@ class AsyncRpcRetryingCallerFactory {
return new SingleRequestCallerBuilder<>();
}
- public class SmallScanCallerBuilder extends BuilderBase {
-
- private TableName tableName;
-
- private Scan scan;
-
- private int limit;
-
- private long scanTimeoutNs = -1L;
-
- private long rpcTimeoutNs = -1L;
-
- public SmallScanCallerBuilder table(TableName tableName) {
- this.tableName = tableName;
- return this;
- }
-
- public SmallScanCallerBuilder setScan(Scan scan) {
- this.scan = scan;
- return this;
- }
-
- public SmallScanCallerBuilder limit(int limit) {
- this.limit = limit;
- return this;
- }
-
- public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
- this.scanTimeoutNs = unit.toNanos(scanTimeout);
- return this;
- }
-
- public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
- this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
- return this;
- }
-
- public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
- this.pauseNs = unit.toNanos(pause);
- return this;
- }
-
- public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
- this.maxAttempts = maxAttempts;
- return this;
- }
-
- public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
- this.startLogErrorsCnt = startLogErrorsCnt;
- return this;
- }
-
- public AsyncSmallScanRpcRetryingCaller build() {
- TableName tableName = checkNotNull(this.tableName, "tableName is null");
- Scan scan = checkNotNull(this.scan, "scan is null");
- checkArgument(limit > 0, "invalid limit %d", limit);
- return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
- scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
- }
-
- /**
- * Shortcut for {@code build().call()}
- */
- public CompletableFuture<List<Result>> call() {
- return build().call();
- }
- }
-
- /**
- * Create retry caller for small scan.
- */
- public SmallScanCallerBuilder smallScan() {
- return new SmallScanCallerBuilder();
- }
-
public class ScanSingleRegionCallerBuilder extends BuilderBase {
private long scannerId = -1L;
@@ -297,10 +224,11 @@ class AsyncRpcRetryingCallerFactory {
}
/**
- * Short cut for {@code build().start()}.
+ * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}.
*/
- public CompletableFuture<Boolean> start() {
- return build().start();
+ public CompletableFuture<Boolean> start(HBaseRpcController controller,
+ ScanResponse respWhenOpen) {
+ return build().start(controller, respWhenOpen);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 5d3b736..3ef4a6f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,11 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import io.netty.util.HashedWheelTimer;
@@ -135,6 +134,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
}
+ private long remainingTimeNs() {
+ return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+ }
+
private void closeScanner() {
resetController(controller, rpcTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
@@ -199,7 +202,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
long delayNs;
if (scanTimeoutNs > 0) {
- long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+ long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed);
return;
@@ -245,7 +248,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
- private void onComplete(ScanResponse resp) {
+ private void onComplete(HBaseRpcController controller, ScanResponse resp) {
if (controller.failed()) {
onError(controller.getFailed());
return;
@@ -288,6 +291,13 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeNoMoreResults();
return;
}
+ if (scan.getLimit() > 0) {
+ // The RS should have set the moreResults field in ScanResponse to false when we have reached
+ // the limit.
+ int limit = scan.getLimit() - results.length;
+ assert limit > 0;
+ scan.setLimit(limit);
+ }
// as in 2.0 this value will always be set
if (!resp.getMoreResultsInRegion()) {
completeWhenNoMoreResultsInRegion.run();
@@ -297,10 +307,26 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
private void call() {
- resetController(controller, rpcTimeoutNs);
+ // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
+ // less than the scan timeout. If the server does not respond in time(usually this will not
+ // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
+ // resending the next request and the only way to fix this is to close the scanner and open a
+ // new one.
+ long callTimeoutNs;
+ if (scanTimeoutNs > 0) {
+ long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+ if (remainingNs <= 0) {
+ completeExceptionally(true);
+ return;
+ }
+ callTimeoutNs = remainingNs;
+ } else {
+ callTimeoutNs = 0L;
+ }
+ resetController(controller, callTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
- nextCallSeq, false, false);
- stub.scan(controller, req, this::onComplete);
+ nextCallSeq, false, false, scan.getLimit());
+ stub.scan(controller, req, resp -> onComplete(controller, resp));
}
private void next() {
@@ -312,10 +338,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
/**
+ * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
+ * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
+ * open scanner request is also needed because we may have some data in the CellScanner which is
+ * contained in the controller.
* @return {@code true} if we should continue, otherwise {@code false}.
*/
- public CompletableFuture<Boolean> start() {
- next();
+ public CompletableFuture<Boolean> start(HBaseRpcController controller,
+ ScanResponse respWhenOpen) {
+ onComplete(controller, respWhenOpen);
return future;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
deleted file mode 100644
index 98a276f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
-
-/**
- * Retry caller for smaller scan.
- */
-@InterfaceAudience.Private
-class AsyncSmallScanRpcRetryingCaller {
-
- private final AsyncConnectionImpl conn;
-
- private final TableName tableName;
-
- private final Scan scan;
-
- private final int limit;
-
- private final long scanTimeoutNs;
-
- private final long rpcTimeoutNs;
-
- private final long pauseNs;
-
- private final int maxAttempts;
-
- private final int startLogErrosCnt;
-
- private final Function<HRegionInfo, Boolean> nextScan;
-
- private final List<Result> resultList;
-
- private final CompletableFuture<List<Result>> future;
-
- public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
- int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
- int startLogErrosCnt) {
- this.conn = conn;
- this.tableName = tableName;
- this.scan = scan;
- this.limit = limit;
- this.scanTimeoutNs = scanTimeoutNs;
- this.rpcTimeoutNs = rpcTimeoutNs;
- this.pauseNs = pauseNs;
- this.maxAttempts = maxAttempts;
- this.startLogErrosCnt = startLogErrosCnt;
- if (scan.isReversed()) {
- this.nextScan = this::reversedNextScan;
- } else {
- this.nextScan = this::nextScan;
- }
- this.resultList = new ArrayList<>();
- this.future = new CompletableFuture<>();
- }
-
- private static final class SmallScanResponse {
-
- public final Result[] results;
-
- public final HRegionInfo currentRegion;
-
- public final boolean hasMoreResultsInRegion;
-
- public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
- boolean hasMoreResultsInRegion) {
- this.results = results;
- this.currentRegion = currentRegion;
- this.hasMoreResultsInRegion = hasMoreResultsInRegion;
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "Findbugs seems to be confused by lambda expression.")
- private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub) {
- CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
- ScanRequest req;
- try {
- req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
- limit - resultList.size(), true);
- } catch (IOException e) {
- future.completeExceptionally(e);
- return future;
- }
- stub.scan(controller, req, resp -> {
- if (controller.failed()) {
- future.completeExceptionally(controller.getFailed());
- } else {
- try {
- Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
- future.complete(
- new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
- });
- return future;
- }
-
- private void onComplete(SmallScanResponse resp) {
- resultList.addAll(Arrays.asList(resp.results));
- if (resultList.size() == limit) {
- future.complete(resultList);
- return;
- }
- if (resp.hasMoreResultsInRegion) {
- if (resp.results.length > 0) {
- scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false);
- }
- scan();
- return;
- }
- if (!nextScan.apply(resp.currentRegion)) {
- future.complete(resultList);
- }
- }
-
- private void scan() {
- conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
- .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
- .whenComplete((resp, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else {
- onComplete(resp);
- }
- });
- }
-
- public CompletableFuture<List<Result>> call() {
- scan();
- return future;
- }
-
- private boolean nextScan(HRegionInfo info) {
- if (noMoreResultsForScan(scan, info)) {
- return false;
- } else {
- scan.withStartRow(info.getEndKey());
- scan();
- return true;
- }
- }
-
- private boolean reversedNextScan(HRegionInfo info) {
- if (noMoreResultsForReverseScan(scan, info)) {
- return false;
- } else {
- scan.withStartRow(info.getStartKey(), false);
- scan();
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index d82fa22..e201ab2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -300,26 +300,34 @@ public interface AsyncTableBase {
CompareOp compareOp, byte[] value, RowMutations mutation);
/**
- * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
- * @see #smallScan(Scan, int)
- */
- default CompletableFuture<List<Result>> smallScan(Scan scan) {
- return smallScan(scan, Integer.MAX_VALUE);
- }
-
- /**
- * Return all the results that match the given scan object. The number of the returned results
- * will not be greater than {@code limit}.
+ * Return all the results that match the given scan object.
+ * <p>
+ * Notice that usually you should use this method with a {@link Scan} object that has limit set.
+ * For example, if you want to get the closest row after a given row, you could do this:
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
+ * if (results.isEmpty()) {
+ * System.out.println("No row after " + Bytes.toStringBinary(row));
+ * } else {
+ * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
+ * + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
+ * }
+ * });
+ * </code>
+ * </pre>
* <p>
- * Notice that the scan must be small, and should not use batch or allowPartialResults. The
- * {@code caching} property of the scan object is also ignored as we will use {@code limit}
- * instead.
- * @param scan A configured {@link Scan} object.
- * @param limit the limit of results count
+ * If your result set is very large, you should use other scan method to get a scanner or use
+ * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
+ * fetch all the results and store them in a List and then return the list to you.
+ * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
+ * result set, it is likely to cause OOM.
* @return The results of this small scan operation. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
- CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+ CompletableFuture<List<Result>> scanAll(Scan scan);
/**
* Test for the existence of columns in the table, as specified by the Gets.
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 7cd257c..f1625ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -144,8 +144,8 @@ class AsyncTableImpl implements AsyncTable {
}
@Override
- public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
- return wrap(rawTable.smallScan(scan, limit));
+ public CompletableFuture<List<Result>> scanAll(Scan scan) {
+ return wrap(rawTable.scanAll(scan));
}
private long resultSize2CacheSize(long maxResultSize) {
@@ -197,4 +197,5 @@ class AsyncTableImpl implements AsyncTable {
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d9d2d35..87323ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -21,13 +21,13 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
@InterfaceAudience.Private
class RawAsyncTableImpl implements RawAsyncTable {
- private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class);
-
private final AsyncConnectionImpl conn;
private final TableName tableName;
@@ -332,12 +330,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
.call();
}
- private <T> CompletableFuture<T> failedFuture(Throwable error) {
- CompletableFuture<T> future = new CompletableFuture<>();
- future.completeExceptionally(error);
- return future;
- }
-
private Scan setDefaultScanConfig(Scan scan) {
// always create a new scan object as we may reset the start row later.
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
@@ -351,27 +343,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
@Override
- public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
- if (!scan.isSmall()) {
- return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
- }
- if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
- return failedFuture(
- new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
- }
- return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
- .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+ public CompletableFuture<List<Result>> scanAll(Scan scan) {
+ CompletableFuture<List<Result>> future = new CompletableFuture<>();
+ List<Result> scanResults = new ArrayList<>();
+ scan(scan, new RawScanResultConsumer() {
+
+ @Override
+ public boolean onNext(Result[] results) {
+ scanResults.addAll(Arrays.asList(results));
+ return true;
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ future.completeExceptionally(error);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(scanResults);
+ }
+ });
+ return future;
}
public void scan(Scan scan, RawScanResultConsumer consumer) {
- if (scan.isSmall()) {
+ if (scan.isSmall() || scan.getLimit() > 0) {
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
- consumer.onError(
- new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
- } else {
- LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
+ consumer.onError(new IllegalArgumentException(
+ "Batch and allowPartial is not allowed for small scan or limited scan"));
}
}
scan = setDefaultScanConfig(scan);
@@ -388,6 +388,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
public List<CompletableFuture<Void>> put(List<Put> puts) {
return voidMutate(puts);
}
+
@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return voidMutate(deletes);
@@ -434,4 +435,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
public long getScanTimeout(TimeUnit unit) {
return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 7f0514c..2e5d422 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -47,7 +47,9 @@ public interface RawScanResultConsumer {
* This method give you a chance to terminate a slow scan operation.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
*/
- boolean onHeartbeat();
+ default boolean onHeartbeat() {
+ return true;
+ }
/**
* Indicate that we hit an unrecoverable error and the scan operation is terminated.
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 8d53b9a..31e76da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -46,38 +46,45 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* Used to perform Scan operations.
* <p>
- * All operations are identical to {@link Get} with the exception of
- * instantiation. Rather than specifying a single row, an optional startRow
- * and stopRow may be defined. If rows are not specified, the Scanner will
- * iterate over all rows.
+ * All operations are identical to {@link Get} with the exception of instantiation. Rather than
+ * specifying a single row, an optional startRow and stopRow may be defined. If rows are not
+ * specified, the Scanner will iterate over all rows.
* <p>
* To get all columns from all rows of a Table, create an instance with no constraints; use the
- * {@link #Scan()} constructor. To constrain the scan to specific column families,
- * call {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
+ * {@link #Scan()} constructor. To constrain the scan to specific column families, call
+ * {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance.
* <p>
- * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn}
- * for each column to retrieve.
+ * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn} for each column to
+ * retrieve.
* <p>
- * To only retrieve columns within a specific range of version timestamps,
- * call {@link #setTimeRange(long, long) setTimeRange}.
+ * To only retrieve columns within a specific range of version timestamps, call
+ * {@link #setTimeRange(long, long) setTimeRange}.
* <p>
- * To only retrieve columns with a specific timestamp, call
- * {@link #setTimeStamp(long) setTimestamp}.
+ * To only retrieve columns with a specific timestamp, call {@link #setTimeStamp(long) setTimestamp}
+ * .
* <p>
- * To limit the number of versions of each column to be returned, call
- * {@link #setMaxVersions(int) setMaxVersions}.
+ * To limit the number of versions of each column to be returned, call {@link #setMaxVersions(int)
+ * setMaxVersions}.
* <p>
- * To limit the maximum number of values returned for each call to next(),
- * call {@link #setBatch(int) setBatch}.
+ * To limit the maximum number of values returned for each call to next(), call
+ * {@link #setBatch(int) setBatch}.
* <p>
* To add a filter, call {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
* <p>
- * Expert: To explicitly disable server-side block caching for this scan,
- * execute {@link #setCacheBlocks(boolean)}.
- * <p><em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan
- * runs and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when
- * you go to clone a Scan instance or if you go to reuse a created Scan instance; safer is create
- * a Scan instance per usage.
+ * For small scan, it is deprecated in 2.0.0. Now we have a {@link #setLimit(int)} method in Scan
+ * object which is used to tell RS how many rows we want. If the rows return reaches the limit, the
+ * RS will close the RegionScanner automatically. And we will also fetch data when openScanner in
+ * the new implementation, this means we can also finish a scan operation in one rpc call. And we
+ * have also introduced a {@link #setReadType(ReadType)} method. You can use this method to tell RS
+ * to use pread explicitly.
+ * <p>
+ * Expert: To explicitly disable server-side block caching for this scan, execute
+ * {@link #setCacheBlocks(boolean)}.
+ * <p>
+ * <em>Note:</em> Usage alters Scan instances. Internally, attributes are updated as the Scan runs
+ * and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when you go to
+ * clone a Scan instance or if you go to reuse a created Scan instance; safer is create a Scan
+ * instance per usage.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -86,9 +93,9 @@ public class Scan extends Query {
private static final String RAW_ATTR = "_raw_";
- private byte [] startRow = HConstants.EMPTY_START_ROW;
+ private byte[] startRow = HConstants.EMPTY_START_ROW;
private boolean includeStartRow = true;
- private byte [] stopRow = HConstants.EMPTY_END_ROW;
+ private byte[] stopRow = HConstants.EMPTY_END_ROW;
private boolean includeStopRow = false;
private int maxVersions = 1;
private int batch = -1;
@@ -172,6 +179,16 @@ public class Scan extends Query {
private long mvccReadPoint = -1L;
/**
+ * The number of rows we want for this scan. We will terminate the scan if the number of return
+ * rows reaches this value.
+ */
+ private int limit = -1;
+
+ /**
+ * Control whether to use pread at server side.
+ */
+ private ReadType readType = ReadType.DEFAULT;
+ /**
* Create a Scan operation across all rows.
*/
public Scan() {}
@@ -257,6 +274,7 @@ public class Scan extends Query {
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
this.mvccReadPoint = scan.getMvccReadPoint();
+ this.limit = scan.getLimit();
}
/**
@@ -969,37 +987,36 @@ public class Scan extends Query {
return attr == null ? false : Bytes.toBoolean(attr);
}
-
-
/**
* Set whether this scan is a small scan
* <p>
- * Small scan should use pread and big scan can use seek + read
- *
- * seek + read is fast but can cause two problem (1) resource contention (2)
- * cause too much network io
- *
- * [89-fb] Using pread for non-compaction read request
- * https://issues.apache.org/jira/browse/HBASE-7266
- *
- * On the other hand, if setting it true, we would do
- * openScanner,next,closeScanner in one RPC call. It means the better
- * performance for small scan. [HBASE-9488].
- *
- * Generally, if the scan range is within one data block(64KB), it could be
- * considered as a small scan.
- *
+ * Small scan should use pread and big scan can use seek + read seek + read is fast but can cause
+ * two problem (1) resource contention (2) cause too much network io [89-fb] Using pread for
+ * non-compaction read request https://issues.apache.org/jira/browse/HBASE-7266 On the other hand,
+ * if setting it true, we would do openScanner,next,closeScanner in one RPC call. It means the
+ * better performance for small scan. [HBASE-9488]. Generally, if the scan range is within one
+ * data block(64KB), it could be considered as a small scan.
* @param small
+ * @deprecated since 2.0.0. Use {@link #setLimit(int)} and {@link #setReadType(ReadType)} instead.
+ * And for the one rpc optimization, now we will also fetch data when openScanner, and
+ * if the number of rows reaches the limit then we will close the scanner
+ * automatically which means we will fall back to one rpc.
+ * @see #setLimit(int)
+ * @see #setReadType(ReadType)
*/
+ @Deprecated
public Scan setSmall(boolean small) {
this.small = small;
+ this.readType = ReadType.PREAD;
return this;
}
/**
* Get whether this scan is a small scan
* @return true if small scan
+ * @deprecated since 2.0.0. See the comment of {@link #setSmall(boolean)}
*/
+ @Deprecated
public boolean isSmall() {
return small;
}
@@ -1081,6 +1098,53 @@ public class Scan extends Query {
}
/**
+ * @return the limit of rows for this scan
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows
+ * reaches this value.
+ * <p>
+ * This condition will be tested at last, after all other conditions such as stopRow, filter, etc.
+ * <p>
+ * Can not be used together with batch and allowPartial.
+ * @param limit the limit of rows for this scan
+ * @return this
+ */
+ public Scan setLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public enum ReadType {
+ DEFAULT, STREAM, PREAD
+ }
+
+ /**
+ * @return the read type for this scan
+ */
+ public ReadType getReadType() {
+ return readType;
+ }
+
+ /**
+ * Set the read type for this scan.
+ * <p>
+ * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For
+ * example, we will always use pread if this is a get scan.
+ * @return this
+ */
+ public Scan setReadType(ReadType readType) {
+ this.readType = readType;
+ return this;
+ }
+
+ /**
* Get the mvcc read point used to open a scanner.
*/
long getMvccReadPoint() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 642fae0..f867acb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -194,7 +194,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
try {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
- this.scanMetrics != null, renew);
+ this.scanMetrics != null, renew, -1);
ScanResponse response = null;
response = getStub().scan(getRpcController(), request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index d3898d4..51a94ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -814,6 +814,32 @@ public final class ProtobufUtil {
return get;
}
+ public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+ switch (readType) {
+ case DEFAULT:
+ return ClientProtos.Scan.ReadType.DEFAULT;
+ case STREAM:
+ return ClientProtos.Scan.ReadType.STREAM;
+ case PREAD:
+ return ClientProtos.Scan.ReadType.PREAD;
+ default:
+ throw new IllegalArgumentException("Unknown ReadType: " + readType);
+ }
+ }
+
+ public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+ switch (readType) {
+ case DEFAULT:
+ return Scan.ReadType.DEFAULT;
+ case STREAM:
+ return Scan.ReadType.STREAM;
+ case PREAD:
+ return Scan.ReadType.PREAD;
+ default:
+ throw new IllegalArgumentException("Unknown ReadType: " + readType);
+ }
+ }
+
/**
* Convert a client Scan to a protocol buffer Scan
*
@@ -917,6 +943,9 @@ public final class ProtobufUtil {
if (scan.includeStopRow()) {
scanBuilder.setIncludeStopRow(true);
}
+ if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+ scanBuilder.setReadType(toReadType(scan.getReadType()));
+ }
return scanBuilder.build();
}
@@ -1015,6 +1044,11 @@ public final class ProtobufUtil {
if (proto.hasMvccReadPoint()) {
PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
}
+ if (scan.isSmall()) {
+ scan.setReadType(Scan.ReadType.PREAD);
+ } else if (proto.hasReadType()) {
+ scan.setReadType(toReadType(proto.getReadType()));
+ }
return scan;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 7764f65..13ff92e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -928,6 +929,32 @@ public final class ProtobufUtil {
return get;
}
+ public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
+ switch (readType) {
+ case DEFAULT:
+ return ClientProtos.Scan.ReadType.DEFAULT;
+ case STREAM:
+ return ClientProtos.Scan.ReadType.STREAM;
+ case PREAD:
+ return ClientProtos.Scan.ReadType.PREAD;
+ default:
+ throw new IllegalArgumentException("Unknown ReadType: " + readType);
+ }
+ }
+
+ public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) {
+ switch (readType) {
+ case DEFAULT:
+ return Scan.ReadType.DEFAULT;
+ case STREAM:
+ return Scan.ReadType.STREAM;
+ case PREAD:
+ return Scan.ReadType.PREAD;
+ default:
+ throw new IllegalArgumentException("Unknown ReadType: " + readType);
+ }
+ }
+
/**
* Convert a client Scan to a protocol buffer Scan
*
@@ -1031,6 +1058,9 @@ public final class ProtobufUtil {
if (scan.includeStopRow()) {
scanBuilder.setIncludeStopRow(true);
}
+ if (scan.getReadType() != Scan.ReadType.DEFAULT) {
+ scanBuilder.setReadType(toReadType(scan.getReadType()));
+ }
return scanBuilder.build();
}
@@ -1129,6 +1159,11 @@ public final class ProtobufUtil {
if (proto.hasMvccReadPoint()) {
PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
}
+ if (scan.isSmall()) {
+ scan.setReadType(Scan.ReadType.PREAD);
+ } else if (proto.hasReadType()) {
+ scan.setReadType(toReadType(proto.getReadType()));
+ }
return scan;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index fd08d98..8de9ad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -462,11 +462,10 @@ public final class RequestConverter {
* @return a scan request
* @throws IOException
*/
- public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
- final int numberOfRows, final boolean closeScanner) throws IOException {
+ public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows,
+ boolean closeScanner) throws IOException {
ScanRequest.Builder builder = ScanRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
+ RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
builder.setRegion(region);
@@ -474,19 +473,21 @@ public final class RequestConverter {
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(scan.isScanMetricsEnabled());
+ if (scan.getLimit() > 0) {
+ builder.setLimitOfRows(scan.getLimit());
+ }
return builder.build();
}
/**
* Create a protocol buffer ScanRequest for a scanner id
- *
* @param scannerId
* @param numberOfRows
* @param closeScanner
* @return a scan request
*/
- public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
- final boolean closeScanner, final boolean trackMetrics) {
+ public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+ boolean trackMetrics) {
ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
@@ -499,16 +500,14 @@ public final class RequestConverter {
/**
* Create a protocol buffer ScanRequest for a scanner id
- *
* @param scannerId
* @param numberOfRows
* @param closeScanner
* @param nextCallSeq
* @return a scan request
*/
- public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
- final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics,
- final boolean renew) {
+ public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
+ long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) {
ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
@@ -518,6 +517,9 @@ public final class RequestConverter {
builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
builder.setRenew(renew);
+ if (limitOfRows > 0) {
+ builder.setLimitOfRows(limitOfRows);
+ }
return builder.build();
}
[2/3] hbase git commit: HBASE-17045 Unify the implementation of small
scan and regular scan
Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
index ef44295..a6f0f43 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
@@ -14487,13 +14487,13 @@ public final class ClientProtos {
boolean getLoadColumnFamiliesOnDemand();
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- boolean hasSmall();
+ @java.lang.Deprecated boolean hasSmall();
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- boolean getSmall();
+ @java.lang.Deprecated boolean getSmall();
/**
* <code>optional bool reversed = 15 [default = false];</code>
@@ -14581,6 +14581,15 @@ public final class ClientProtos {
* <code>optional bool include_stop_row = 22 [default = false];</code>
*/
boolean getIncludeStopRow();
+
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ boolean hasReadType();
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType();
}
/**
* <pre>
@@ -14624,6 +14633,7 @@ public final class ClientProtos {
mvccReadPoint_ = 0L;
includeStartRow_ = true;
includeStopRow_ = false;
+ readType_ = 0;
}
@java.lang.Override
@@ -14798,6 +14808,17 @@ public final class ClientProtos {
includeStopRow_ = input.readBool();
break;
}
+ case 184: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(23, rawValue);
+ } else {
+ bitField0_ |= 0x00080000;
+ readType_ = rawValue;
+ }
+ break;
+ }
}
}
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14831,6 +14852,105 @@ public final class ClientProtos {
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.Builder.class);
}
+ /**
+ * Protobuf enum {@code hbase.pb.Scan.ReadType}
+ */
+ public enum ReadType
+ implements org.apache.hadoop.hbase.shaded.com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * <code>DEFAULT = 0;</code>
+ */
+ DEFAULT(0),
+ /**
+ * <code>STREAM = 1;</code>
+ */
+ STREAM(1),
+ /**
+ * <code>PREAD = 2;</code>
+ */
+ PREAD(2),
+ ;
+
+ /**
+ * <code>DEFAULT = 0;</code>
+ */
+ public static final int DEFAULT_VALUE = 0;
+ /**
+ * <code>STREAM = 1;</code>
+ */
+ public static final int STREAM_VALUE = 1;
+ /**
+ * <code>PREAD = 2;</code>
+ */
+ public static final int PREAD_VALUE = 2;
+
+
+ public final int getNumber() {
+ return value;
+ }
+
+ /**
+ * @deprecated Use {@link #forNumber(int)} instead.
+ */
+ @java.lang.Deprecated
+ public static ReadType valueOf(int value) {
+ return forNumber(value);
+ }
+
+ public static ReadType forNumber(int value) {
+ switch (value) {
+ case 0: return DEFAULT;
+ case 1: return STREAM;
+ case 2: return PREAD;
+ default: return null;
+ }
+ }
+
+ public static org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<ReadType>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<
+ ReadType> internalValueMap =
+ new org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<ReadType>() {
+ public ReadType findValueByNumber(int number) {
+ return ReadType.forNumber(number);
+ }
+ };
+
+ public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(ordinal());
+ }
+ public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final ReadType[] VALUES = values();
+
+ public static ReadType valueOf(
+ org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int value;
+
+ private ReadType(int value) {
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType)
+ }
+
private int bitField0_;
public static final int COLUMN_FIELD_NUMBER = 1;
private java.util.List<org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column> column_;
@@ -15090,15 +15210,15 @@ public final class ClientProtos {
public static final int SMALL_FIELD_NUMBER = 14;
private boolean small_;
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean hasSmall() {
+ @java.lang.Deprecated public boolean hasSmall() {
return ((bitField0_ & 0x00000800) == 0x00000800);
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean getSmall() {
+ @java.lang.Deprecated public boolean getSmall() {
return small_;
}
@@ -15243,6 +15363,22 @@ public final class ClientProtos {
return includeStopRow_;
}
+ public static final int READTYPE_FIELD_NUMBER = 23;
+ private int readType_;
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public boolean hasReadType() {
+ return ((bitField0_ & 0x00080000) == 0x00080000);
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+ return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+ }
+
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
@@ -15345,6 +15481,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00040000) == 0x00040000)) {
output.writeBool(22, includeStopRow_);
}
+ if (((bitField0_ & 0x00080000) == 0x00080000)) {
+ output.writeEnum(23, readType_);
+ }
unknownFields.writeTo(output);
}
@@ -15441,6 +15580,10 @@ public final class ClientProtos {
size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
.computeBoolSize(22, includeStopRow_);
}
+ if (((bitField0_ & 0x00080000) == 0x00080000)) {
+ size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+ .computeEnumSize(23, readType_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -15558,6 +15701,10 @@ public final class ClientProtos {
result = result && (getIncludeStopRow()
== other.getIncludeStopRow());
}
+ result = result && (hasReadType() == other.hasReadType());
+ if (hasReadType()) {
+ result = result && readType_ == other.readType_;
+ }
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@@ -15666,6 +15813,10 @@ public final class ClientProtos {
hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean(
getIncludeStopRow());
}
+ if (hasReadType()) {
+ hash = (37 * hash) + READTYPE_FIELD_NUMBER;
+ hash = (53 * hash) + readType_;
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -15863,6 +16014,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00100000);
includeStopRow_ = false;
bitField0_ = (bitField0_ & ~0x00200000);
+ readType_ = 0;
+ bitField0_ = (bitField0_ & ~0x00400000);
return this;
}
@@ -15998,6 +16151,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00040000;
}
result.includeStopRow_ = includeStopRow_;
+ if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+ to_bitField0_ |= 0x00080000;
+ }
+ result.readType_ = readType_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -16175,6 +16332,9 @@ public final class ClientProtos {
if (other.hasIncludeStopRow()) {
setIncludeStopRow(other.getIncludeStopRow());
}
+ if (other.hasReadType()) {
+ setReadType(other.getReadType());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -17251,30 +17411,30 @@ public final class ClientProtos {
private boolean small_ ;
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean hasSmall() {
+ @java.lang.Deprecated public boolean hasSmall() {
return ((bitField0_ & 0x00002000) == 0x00002000);
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean getSmall() {
+ @java.lang.Deprecated public boolean getSmall() {
return small_;
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public Builder setSmall(boolean value) {
+ @java.lang.Deprecated public Builder setSmall(boolean value) {
bitField0_ |= 0x00002000;
small_ = value;
onChanged();
return this;
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public Builder clearSmall() {
+ @java.lang.Deprecated public Builder clearSmall() {
bitField0_ = (bitField0_ & ~0x00002000);
small_ = false;
onChanged();
@@ -17748,6 +17908,42 @@ public final class ClientProtos {
onChanged();
return this;
}
+
+ private int readType_ = 0;
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public boolean hasReadType() {
+ return ((bitField0_ & 0x00400000) == 0x00400000);
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+ org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+ return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public Builder setReadType(org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00400000;
+ readType_ = value.getNumber();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public Builder clearReadType() {
+ bitField0_ = (bitField0_ & ~0x00400000);
+ readType_ = 0;
+ onChanged();
+ return this;
+ }
public final Builder setUnknownFields(
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
@@ -17898,6 +18094,23 @@ public final class ClientProtos {
* <code>optional bool renew = 10 [default = false];</code>
*/
boolean getRenew();
+
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ boolean hasLimitOfRows();
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ int getLimitOfRows();
}
/**
* <pre>
@@ -17930,6 +18143,7 @@ public final class ClientProtos {
clientHandlesHeartbeats_ = false;
trackScanMetrics_ = false;
renew_ = false;
+ limitOfRows_ = 0;
}
@java.lang.Override
@@ -18026,6 +18240,11 @@ public final class ClientProtos {
renew_ = input.readBool();
break;
}
+ case 88: {
+ bitField0_ |= 0x00000400;
+ limitOfRows_ = input.readUInt32();
+ break;
+ }
}
}
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18213,6 +18432,29 @@ public final class ClientProtos {
return renew_;
}
+ public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+ private int limitOfRows_;
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public boolean hasLimitOfRows() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public int getLimitOfRows() {
+ return limitOfRows_;
+ }
+
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
@@ -18267,6 +18509,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeBool(10, renew_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeUInt32(11, limitOfRows_);
+ }
unknownFields.writeTo(output);
}
@@ -18315,6 +18560,10 @@ public final class ClientProtos {
size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
.computeBoolSize(10, renew_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(11, limitOfRows_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -18382,6 +18631,11 @@ public final class ClientProtos {
result = result && (getRenew()
== other.getRenew());
}
+ result = result && (hasLimitOfRows() == other.hasLimitOfRows());
+ if (hasLimitOfRows()) {
+ result = result && (getLimitOfRows()
+ == other.getLimitOfRows());
+ }
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@@ -18440,6 +18694,10 @@ public final class ClientProtos {
hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean(
getRenew());
}
+ if (hasLimitOfRows()) {
+ hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER;
+ hash = (53 * hash) + getLimitOfRows();
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -18599,6 +18857,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000100);
renew_ = false;
bitField0_ = (bitField0_ & ~0x00000200);
+ limitOfRows_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -18671,6 +18931,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000200;
}
result.renew_ = renew_;
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.limitOfRows_ = limitOfRows_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -18743,6 +19007,9 @@ public final class ClientProtos {
if (other.hasRenew()) {
setRenew(other.getRenew());
}
+ if (other.hasLimitOfRows()) {
+ setLimitOfRows(other.getLimitOfRows());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -19272,6 +19539,54 @@ public final class ClientProtos {
onChanged();
return this;
}
+
+ private int limitOfRows_ ;
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public boolean hasLimitOfRows() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public int getLimitOfRows() {
+ return limitOfRows_;
+ }
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public Builder setLimitOfRows(int value) {
+ bitField0_ |= 0x00000400;
+ limitOfRows_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ *
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ */
+ public Builder clearLimitOfRows() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ limitOfRows_ = 0;
+ onChanged();
+ return this;
+ }
public final Builder setUnknownFields(
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
@@ -40834,7 +41149,7 @@ public final class ClientProtos {
"tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" +
"_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" +
"\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " +
- "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
+ "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
"Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" +
"eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" +
"w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" +
@@ -40843,104 +41158,108 @@ public final class ClientProtos {
"cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" +
"ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" +
"\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" +
- "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" +
- "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" +
- " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" +
- "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " +
- "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" +
- "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024",
- " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" +
- "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" +
- "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" +
- "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" +
- "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" +
- "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" +
- "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" +
- "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" +
- "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " +
- "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per",
- "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" +
- "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" +
- "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" +
- "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" +
- "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" +
- "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" +
- ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" +
- ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" +
- " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" +
- "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq",
- "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" +
- "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" +
- "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " +
- "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" +
- "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" +
- "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" +
- "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" +
- "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" +
- "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" +
- ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re",
- "gionSpecifier\"-\n\027PrepareBulkLoadResponse" +
- "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" +
- "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" +
- "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" +
- "BulkLoadResponse\"a\n\026CoprocessorServiceCa" +
- "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" +
- "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" +
- "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" +
- ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" +
- "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.",
- "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" +
- ".CoprocessorServiceCall\"o\n\032CoprocessorSe" +
- "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
- ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." +
- "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" +
- "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" +
- "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" +
- "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" +
- "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" +
- "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002",
- " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" +
- "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" +
- "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" +
- "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" +
- "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" +
- "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" +
- "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" +
- " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" +
- "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" +
- "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co",
- "processorServiceResult\0220\n\tloadStats\030\005 \001(" +
- "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" +
- "onActionResult\0226\n\021resultOrException\030\001 \003(" +
- "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" +
- "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" +
- "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" +
- ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" +
- "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" +
- "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" +
- "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc",
- "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." +
- "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" +
- "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" +
- "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." +
- "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" +
- "b.MutateRequest\032\030.hbase.pb.MutateRespons" +
- "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" +
- ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" +
- "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" +
- "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .",
- "hbase.pb.PrepareBulkLoadRequest\032!.hbase." +
- "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" +
- "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" +
- "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" +
- "ecService\022#.hbase.pb.CoprocessorServiceR" +
- "equest\032$.hbase.pb.CoprocessorServiceResp" +
- "onse\022d\n\027ExecRegionServerService\022#.hbase." +
- "pb.CoprocessorServiceRequest\032$.hbase.pb." +
- "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" +
- "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp",
- "onseBI\n1org.apache.hadoop.hbase.shaded.p" +
- "rotobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+ "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" +
+ "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" +
+ "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" +
+ "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" +
+ "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" +
+ "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi",
+ "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" +
+ "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" +
+ "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" +
+ ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" +
+ "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" +
+ "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" +
+ "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" +
+ "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" +
+ "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" +
+ "client_handles_partials\030\007 \001(\010\022!\n\031client_",
+ "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" +
+ "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" +
+ "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" +
+ "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " +
+ "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" +
+ "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" +
+ "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+ "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" +
+ "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" +
+ "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p",
+ "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" +
+ "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" +
+ "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" +
+ "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" +
+ "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" +
+ "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" +
+ "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" +
+ "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" +
+ "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
+ "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001",
+ "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" +
+ "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
+ "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" +
+ "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" +
+ "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" +
+ "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" +
+ "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" +
+ "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" +
+ "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" +
+ "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030",
+ "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" +
+ "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" +
+ "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" +
+ ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" +
+ " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" +
+ "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" +
+ "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" +
+ "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" +
+ "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" +
+ "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.",
+ "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" +
+ "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" +
+ "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" +
+ "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" +
+ "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" +
+ "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" +
+ "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" +
+ "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" +
+ ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" +
+ "b.RegionLoadStats\"\336\001\n\021ResultOrException\022",
+ "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" +
+ "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" +
+ "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" +
+ "base.pb.CoprocessorServiceResult\0220\n\tload" +
+ "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" +
+ "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" +
+ "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" +
+ "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" +
+ "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" +
+ "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup",
+ "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" +
+ "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" +
+ "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" +
+ "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" +
+ "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" +
+ "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" +
+ "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" +
+ "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" +
+ "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" +
+ "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque",
+ "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" +
+ "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" +
+ "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" +
+ "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" +
+ "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" +
+ "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" +
+ "adRequest\032!.hbase.pb.CleanupBulkLoadResp" +
+ "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" +
+ "orServiceRequest\032$.hbase.pb.CoprocessorS" +
+ "erviceResponse\022d\n\027ExecRegionServerServic",
+ "e\022#.hbase.pb.CoprocessorServiceRequest\032$" +
+ ".hbase.pb.CoprocessorServiceResponse\0228\n\005" +
+ "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" +
+ ".MultiResponseBI\n1org.apache.hadoop.hbas" +
+ "e.shaded.protobuf.generatedB\014ClientProto" +
+ "sH\001\210\001\001\240\001\001"
};
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
@@ -41042,13 +41361,13 @@ public final class ClientProtos {
internal_static_hbase_pb_Scan_fieldAccessorTable = new
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_hbase_pb_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", });
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", });
internal_static_hbase_pb_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_hbase_pb_ScanRequest_descriptor,
- new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", });
+ new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", });
internal_static_hbase_pb_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13);
internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol-shaded/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 2793b89..e5a10b0 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -249,7 +249,7 @@ message Scan {
optional uint32 store_limit = 11;
optional uint32 store_offset = 12;
optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
- optional bool small = 14;
+ optional bool small = 14 [deprecated = true];
optional bool reversed = 15 [default = false];
optional Consistency consistency = 16 [default = STRONG];
optional uint32 caching = 17;
@@ -258,6 +258,13 @@ message Scan {
optional uint64 mvcc_read_point = 20 [default = 0];
optional bool include_start_row = 21 [default = true];
optional bool include_stop_row = 22 [default = false];
+
+ enum ReadType {
+ DEFAULT = 0;
+ STREAM = 1;
+ PREAD = 2;
+ }
+ optional ReadType readType = 23 [default = DEFAULT];
}
/**
@@ -282,6 +289,8 @@ message ScanRequest {
optional bool client_handles_heartbeats = 8;
optional bool track_scan_metrics = 9;
optional bool renew = 10 [default = false];
+ // if we have returned limit_of_rows rows to client, then close the scanner.
+ optional uint32 limit_of_rows = 11 [default = 0];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 087576c..a550f85 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -14145,15 +14145,15 @@ public final class ClientProtos {
*/
boolean getLoadColumnFamiliesOnDemand();
- // optional bool small = 14;
+ // optional bool small = 14 [deprecated = true];
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- boolean hasSmall();
+ @java.lang.Deprecated boolean hasSmall();
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- boolean getSmall();
+ @java.lang.Deprecated boolean getSmall();
// optional bool reversed = 15 [default = false];
/**
@@ -14249,6 +14249,16 @@ public final class ClientProtos {
* <code>optional bool include_stop_row = 22 [default = false];</code>
*/
boolean getIncludeStopRow();
+
+ // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ boolean hasReadType();
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType();
}
/**
* Protobuf type {@code hbase.pb.Scan}
@@ -14453,6 +14463,17 @@ public final class ClientProtos {
includeStopRow_ = input.readBool();
break;
}
+ case 184: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(23, rawValue);
+ } else {
+ bitField0_ |= 0x00080000;
+ readType_ = value;
+ }
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14501,6 +14522,97 @@ public final class ClientProtos {
return PARSER;
}
+ /**
+ * Protobuf enum {@code hbase.pb.Scan.ReadType}
+ */
+ public enum ReadType
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * <code>DEFAULT = 0;</code>
+ */
+ DEFAULT(0, 0),
+ /**
+ * <code>STREAM = 1;</code>
+ */
+ STREAM(1, 1),
+ /**
+ * <code>PREAD = 2;</code>
+ */
+ PREAD(2, 2),
+ ;
+
+ /**
+ * <code>DEFAULT = 0;</code>
+ */
+ public static final int DEFAULT_VALUE = 0;
+ /**
+ * <code>STREAM = 1;</code>
+ */
+ public static final int STREAM_VALUE = 1;
+ /**
+ * <code>PREAD = 2;</code>
+ */
+ public static final int PREAD_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static ReadType valueOf(int value) {
+ switch (value) {
+ case 0: return DEFAULT;
+ case 1: return STREAM;
+ case 2: return PREAD;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<ReadType>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<ReadType>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<ReadType>() {
+ public ReadType findValueByNumber(int number) {
+ return ReadType.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final ReadType[] VALUES = values();
+
+ public static ReadType valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private ReadType(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType)
+ }
+
private int bitField0_;
// repeated .hbase.pb.Column column = 1;
public static final int COLUMN_FIELD_NUMBER = 1;
@@ -14770,19 +14882,19 @@ public final class ClientProtos {
return loadColumnFamiliesOnDemand_;
}
- // optional bool small = 14;
+ // optional bool small = 14 [deprecated = true];
public static final int SMALL_FIELD_NUMBER = 14;
private boolean small_;
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean hasSmall() {
+ @java.lang.Deprecated public boolean hasSmall() {
return ((bitField0_ & 0x00000800) == 0x00000800);
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean getSmall() {
+ @java.lang.Deprecated public boolean getSmall() {
return small_;
}
@@ -14934,6 +15046,22 @@ public final class ClientProtos {
return includeStopRow_;
}
+ // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+ public static final int READTYPE_FIELD_NUMBER = 23;
+ private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_;
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public boolean hasReadType() {
+ return ((bitField0_ & 0x00080000) == 0x00080000);
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+ return readType_;
+ }
+
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@@ -14957,6 +15085,7 @@ public final class ClientProtos {
mvccReadPoint_ = 0L;
includeStartRow_ = true;
includeStopRow_ = false;
+ readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -15060,6 +15189,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00040000) == 0x00040000)) {
output.writeBool(22, includeStopRow_);
}
+ if (((bitField0_ & 0x00080000) == 0x00080000)) {
+ output.writeEnum(23, readType_.getNumber());
+ }
getUnknownFields().writeTo(output);
}
@@ -15157,6 +15289,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(22, includeStopRow_);
}
+ if (((bitField0_ & 0x00080000) == 0x00080000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(23, readType_.getNumber());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -15281,6 +15417,11 @@ public final class ClientProtos {
result = result && (getIncludeStopRow()
== other.getIncludeStopRow());
}
+ result = result && (hasReadType() == other.hasReadType());
+ if (hasReadType()) {
+ result = result &&
+ (getReadType() == other.getReadType());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -15382,6 +15523,10 @@ public final class ClientProtos {
hash = (37 * hash) + INCLUDE_STOP_ROW_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getIncludeStopRow());
}
+ if (hasReadType()) {
+ hash = (37 * hash) + READTYPE_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getReadType());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -15571,6 +15716,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00100000);
includeStopRow_ = false;
bitField0_ = (bitField0_ & ~0x00200000);
+ readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+ bitField0_ = (bitField0_ & ~0x00400000);
return this;
}
@@ -15710,6 +15857,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00040000;
}
result.includeStopRow_ = includeStopRow_;
+ if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+ to_bitField0_ |= 0x00080000;
+ }
+ result.readType_ = readType_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -15861,6 +16012,9 @@ public final class ClientProtos {
if (other.hasIncludeStopRow()) {
setIncludeStopRow(other.getIncludeStopRow());
}
+ if (other.hasReadType()) {
+ setReadType(other.getReadType());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -16945,33 +17099,33 @@ public final class ClientProtos {
return this;
}
- // optional bool small = 14;
+ // optional bool small = 14 [deprecated = true];
private boolean small_ ;
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean hasSmall() {
+ @java.lang.Deprecated public boolean hasSmall() {
return ((bitField0_ & 0x00002000) == 0x00002000);
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public boolean getSmall() {
+ @java.lang.Deprecated public boolean getSmall() {
return small_;
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public Builder setSmall(boolean value) {
+ @java.lang.Deprecated public Builder setSmall(boolean value) {
bitField0_ |= 0x00002000;
small_ = value;
onChanged();
return this;
}
/**
- * <code>optional bool small = 14;</code>
+ * <code>optional bool small = 14 [deprecated = true];</code>
*/
- public Builder clearSmall() {
+ @java.lang.Deprecated public Builder clearSmall() {
bitField0_ = (bitField0_ & ~0x00002000);
small_ = false;
onChanged();
@@ -17452,6 +17606,42 @@ public final class ClientProtos {
return this;
}
+ // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+ private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public boolean hasReadType() {
+ return ((bitField0_ & 0x00400000) == 0x00400000);
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+ return readType_;
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public Builder setReadType(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00400000;
+ readType_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];</code>
+ */
+ public Builder clearReadType() {
+ bitField0_ = (bitField0_ & ~0x00400000);
+ readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.Scan)
}
@@ -17573,6 +17763,24 @@ public final class ClientProtos {
* <code>optional bool renew = 10 [default = false];</code>
*/
boolean getRenew();
+
+ // optional uint32 limit_of_rows = 11 [default = 0];
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ boolean hasLimitOfRows();
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ int getLimitOfRows();
}
/**
* Protobuf type {@code hbase.pb.ScanRequest}
@@ -17704,6 +17912,11 @@ public final class ClientProtos {
renew_ = input.readBool();
break;
}
+ case 88: {
+ bitField0_ |= 0x00000400;
+ limitOfRows_ = input.readUInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17916,6 +18129,30 @@ public final class ClientProtos {
return renew_;
}
+ // optional uint32 limit_of_rows = 11 [default = 0];
+ public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+ private int limitOfRows_;
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public boolean hasLimitOfRows() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public int getLimitOfRows() {
+ return limitOfRows_;
+ }
+
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@@ -17927,6 +18164,7 @@ public final class ClientProtos {
clientHandlesHeartbeats_ = false;
trackScanMetrics_ = false;
renew_ = false;
+ limitOfRows_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -17982,6 +18220,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeBool(10, renew_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeUInt32(11, limitOfRows_);
+ }
getUnknownFields().writeTo(output);
}
@@ -18031,6 +18272,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(10, renew_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(11, limitOfRows_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -18104,6 +18349,11 @@ public final class ClientProtos {
result = result && (getRenew()
== other.getRenew());
}
+ result = result && (hasLimitOfRows() == other.hasLimitOfRows());
+ if (hasLimitOfRows()) {
+ result = result && (getLimitOfRows()
+ == other.getLimitOfRows());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -18157,6 +18407,10 @@ public final class ClientProtos {
hash = (37 * hash) + RENEW_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getRenew());
}
+ if (hasLimitOfRows()) {
+ hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER;
+ hash = (53 * hash) + getLimitOfRows();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -18309,6 +18563,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000100);
renew_ = false;
bitField0_ = (bitField0_ & ~0x00000200);
+ limitOfRows_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -18385,6 +18641,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000200;
}
result.renew_ = renew_;
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.limitOfRows_ = limitOfRows_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -18431,6 +18691,9 @@ public final class ClientProtos {
if (other.hasRenew()) {
setRenew(other.getRenew());
}
+ if (other.hasLimitOfRows()) {
+ setLimitOfRows(other.getLimitOfRows());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -18968,6 +19231,55 @@ public final class ClientProtos {
return this;
}
+ // optional uint32 limit_of_rows = 11 [default = 0];
+ private int limitOfRows_ ;
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public boolean hasLimitOfRows() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public int getLimitOfRows() {
+ return limitOfRows_;
+ }
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public Builder setLimitOfRows(int value) {
+ bitField0_ |= 0x00000400;
+ limitOfRows_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional uint32 limit_of_rows = 11 [default = 0];</code>
+ *
+ * <pre>
+ * if we have returned limit_of_rows rows to client, then close the scanner.
+ * </pre>
+ */
+ public Builder clearLimitOfRows() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ limitOfRows_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest)
}
@@ -39912,7 +40224,7 @@ public final class ClientProtos {
"tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" +
"_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" +
"\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " +
- "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
+ "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
"Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" +
"eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" +
"w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" +
@@ -39921,104 +40233,108 @@ public final class ClientProtos {
"cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" +
"ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" +
"\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" +
- "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" +
- "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" +
- " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" +
- "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " +
- "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" +
- "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024",
- " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" +
- "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" +
- "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" +
- "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" +
- "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" +
- "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" +
- "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" +
- "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" +
- "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " +
- "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per",
- "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" +
- "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" +
- "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" +
- "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" +
- "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" +
- "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" +
- ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" +
- ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" +
- " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" +
- "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq",
- "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" +
- "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" +
- "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " +
- "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" +
- "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" +
- "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" +
- "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" +
- "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" +
- "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" +
- ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re",
- "gionSpecifier\"-\n\027PrepareBulkLoadResponse" +
- "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" +
- "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" +
- "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" +
- "BulkLoadResponse\"a\n\026CoprocessorServiceCa" +
- "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" +
- "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" +
- "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" +
- ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" +
- "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.",
- "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" +
- ".CoprocessorServiceCall\"o\n\032CoprocessorSe" +
- "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
- ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." +
- "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" +
- "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" +
- "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" +
- "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" +
- "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" +
- "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002",
- " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" +
- "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" +
- "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" +
- "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" +
- "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" +
- "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" +
- "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" +
- " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" +
- "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" +
- "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co",
- "processorServiceResult\0220\n\tloadStats\030\005 \001(" +
- "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" +
- "onActionResult\0226\n\021resultOrException\030\001 \003(" +
- "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" +
- "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" +
- "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" +
- ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" +
- "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" +
- "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" +
- "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc",
- "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." +
- "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" +
- "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" +
- "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." +
- "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" +
- "b.MutateRequest\032\030.hbase.pb.MutateRespons" +
- "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" +
- ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" +
- "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" +
- "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .",
- "hbase.pb.PrepareBulkLoadRequest\032!.hbase." +
- "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" +
- "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" +
- "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" +
- "ecService\022#.hbase.pb.CoprocessorServiceR" +
- "equest\032$.hbase.pb.CoprocessorServiceResp" +
- "onse\022d\n\027ExecRegionServerService\022#.hbase." +
- "pb.CoprocessorServiceRequest\032$.hbase.pb." +
- "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" +
- "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp",
- "onseBB\n*org.apache.hadoop.hbase.protobuf" +
- ".generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+ "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" +
+ "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" +
+ "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" +
+ "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" +
+ "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" +
+ "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi",
+ "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" +
+ "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" +
+ "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" +
+ ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" +
+ "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" +
+ "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" +
+ "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" +
+ "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" +
+ "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" +
+ "client_handles_partials\030\007 \001(\010\022!\n\031client_",
+ "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" +
+ "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" +
+ "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" +
+ "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " +
+ "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" +
+ "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" +
+ "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+ "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" +
+ "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" +
+ "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p",
+ "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" +
+ "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" +
+ "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" +
+ "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" +
+ "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" +
+ "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" +
+ "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" +
+ "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" +
+ "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
+ "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001",
+ "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" +
+ "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
+ "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" +
+ "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" +
+ "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" +
+ "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" +
+ "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" +
+ "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" +
+ "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" +
+ "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030",
+ "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" +
+ "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" +
+ "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" +
+ ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" +
+ " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" +
+ "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" +
+ "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" +
+ "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" +
+ "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" +
+ "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.",
+ "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" +
+ "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" +
+ "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" +
+ "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" +
+ "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" +
+ "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" +
+ "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" +
+ "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" +
+ ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" +
+ "b.RegionLoadStats\"\336\001\n\021ResultOrException\022",
+ "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" +
+ "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" +
+ "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" +
+ "base.pb.CoprocessorServiceResult\0220\n\tload" +
+ "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" +
+ "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" +
+ "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" +
+ "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" +
+ "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" +
+ "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup",
+ "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" +
+ "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" +
+ "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" +
+ "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" +
+ "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" +
+ "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" +
+ "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" +
+ "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" +
+ "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" +
+ "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque",
+ "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" +
+ "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" +
+ "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" +
+ "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" +
+ "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" +
+ "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" +
+ "adRequest\032!.hbase.pb.CleanupBulkLoadResp" +
+ "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" +
+ "orServiceRequest\032$.hbase.pb.CoprocessorS" +
+ "erviceResponse\022d\n\027ExecRegionServerServic",
+ "e\022#.hbase.pb.CoprocessorServiceRequest\032$" +
+ ".hbase.pb.CoprocessorServiceResponse\0228\n\005" +
+ "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" +
+ ".MultiResponseBB\n*org.apache.hadoop.hbas" +
+ "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
+ "\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -40108,13 +40424,13 @@ public final class ClientProtos {
internal_static_hbase_pb_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", });
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", });
internal_static_hbase_pb_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ScanRequest_descriptor,
- new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", });
+ new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", });
internal_static_hbase_pb_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13);
internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index ae932f7..5cf66c2 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -249,7 +249,7 @@ message Scan {
optional uint32 store_limit = 11;
optional uint32 store_offset = 12;
optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
- optional bool small = 14;
+ optional bool small = 14 [deprecated = true];
optional bool reversed = 15 [default = false];
optional Consistency consistency = 16 [default = STRONG];
optional uint32 caching = 17;
@@ -258,6 +258,13 @@ message Scan {
optional uint64 mvcc_read_point = 20 [default = 0];
optional bool include_start_row = 21 [default = true];
optional bool include_stop_row = 22 [default = false];
+
+ enum ReadType {
+ DEFAULT = 0;
+ STREAM = 1;
+ PREAD = 2;
+ }
+ optional ReadType readType = 23 [default = DEFAULT];
}
/**
@@ -282,6 +289,8 @@ message ScanRequest {
optional bool client_handles_heartbeats = 8;
optional bool track_scan_metrics = 9;
optional bool renew = 10 [default = false];
+ // if we have returned limit_of_rows rows to client, then close the scanner.
+ optional uint32 limit_of_rows = 11 [default = 0];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a072dce..9847dfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1133,6 +1133,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ @VisibleForTesting
+ public int getScannersCount() {
+ return scanners.size();
+ }
+
public
RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
@@ -3014,6 +3019,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
RegionScanner scanner = rsh.s;
boolean moreResults = true;
boolean moreResultsInRegion = true;
+ // this is the limit of rows for this scan, if we the number of rows reach this value, we will
+ // close the scanner.
+ int limitOfRows;
+ if (request.hasLimitOfRows()) {
+ limitOfRows = request.getLimitOfRows();
+ rows = Math.min(rows, limitOfRows);
+ } else {
+ limitOfRows = -1;
+ }
MutableObject lastBlock = new MutableObject();
boolean scannerClosed = false;
try {
@@ -3046,6 +3060,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// with the old scan implementation where we just ignore the returned results if moreResults
// is false. Can remove the isEmpty check after we get rid of the old implementation.
moreResults = false;
+ } else if (limitOfRows > 0 && results.size() >= limitOfRows
+ && !results.get(results.size() - 1).isPartial()) {
+ // if we have reached the limit of rows
+ moreResults = false;
}
addResults(builder, results, (HBaseRpcController) controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 7e08eca..8c48aef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -127,7 +127,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected Cell lastTop = null;
// A flag whether use pread for scan
- private boolean scanUsePread = false;
+ private final boolean scanUsePread;
// Indicates whether there was flush during the course of the scan
protected volatile boolean flushed = false;
// generally we get one file from a flush
@@ -168,7 +168,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
this.maxRowSize = scanInfo.getTableMaxRowSize();
- this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
+ if (get) {
+ this.scanUsePread = true;
+ } else {
+ switch (scan.getReadType()) {
+ case STREAM:
+ this.scanUsePread = false;
+ break;
+ case PREAD:
+ this.scanUsePread = true;
+ break;
+ default:
+ this.scanUsePread = scanInfo.isUsePread();
+ break;
+ }
+ }
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
if (this.store != null && this.store.getStorefilesCount() > 1) {
@@ -348,10 +362,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @return list of scanners to seek
*/
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
- final boolean isCompaction = false;
- boolean usePread = get || scanUsePread;
- return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
- isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
+ return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher,
+ scan.getStartRow(), scan.getStopRow(), this.readPt));
}
/**
@@ -803,18 +815,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
protected void resetScannerStack(Cell lastTopKey) throws IOException {
- /* When we have the scan object, should we not pass it to getScanners()
- * to get a limited set of scanners? We did so in the constructor and we
- * could have done it now by storing the scan object from the constructor
- */
-
- final boolean isCompaction = false;
- boolean usePread = get || scanUsePread;
+ // When we have the scan object, should we not pass it to getScanners() to get a limited set of
+ // scanners? We did so in the constructor and we could have done it now by storing the scan
+ // object from the constructor
List<KeyValueScanner> scanners = null;
try {
flushLock.lock();
- scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
- isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+ scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
+ scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/85d70189/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index a1d926d..b80efae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -95,6 +95,12 @@ public abstract class AbstractTestAsyncTableScan {
@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan());
+ // make sure all scanners are closed at RS side
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+ .forEach(rs -> assertEquals(
+ "The scanner count of " + rs.getServerName() + " is "
+ + rs.getRSRpcServices().getScannersCount(),
+ 0, rs.getRSRpcServices().getScannersCount()));
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);