You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/30 02:33:05 UTC
[2/2] hbase git commit: HBASE-17167 Pass mvcc to client when scan
HBASE-17167 Pass mvcc to client when scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/890fcbd0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/890fcbd0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/890fcbd0
Branch: refs/heads/master
Commit: 890fcbd0e6f916cc94b45b881b0cc060cc1e835c
Parents: 7c43a23
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 29 17:13:49 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Nov 30 10:11:04 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 406 +++++++++---------
.../org/apache/hadoop/hbase/client/HTable.java | 7 +-
.../client/PackagePrivateFieldAccessor.java | 41 ++
.../org/apache/hadoop/hbase/client/Scan.java | 55 ++-
.../hadoop/hbase/client/ScannerCallable.java | 3 +
.../hadoop/hbase/protobuf/ProtobufUtil.java | 8 +
.../hbase/shaded/protobuf/ProtobufUtil.java | 8 +
.../shaded/protobuf/generated/ClientProtos.java | 412 +++++++++++++-----
.../src/main/protobuf/Client.proto | 12 +-
.../hbase/protobuf/generated/ClientProtos.java | 416 ++++++++++++++-----
hbase-protocol/src/main/protobuf/Client.proto | 12 +-
.../hadoop/hbase/regionserver/HRegion.java | 9 +-
.../hbase/regionserver/RSRpcServices.java | 4 +-
.../hbase/TestPartialResultsFromClientSide.java | 13 +-
.../hbase/client/TestMvccConsistentScanner.java | 134 ++++++
.../hadoop/hbase/regionserver/TestTags.java | 14 +-
.../regionserver/TestReplicationSink.java | 22 +-
17 files changed, 1120 insertions(+), 456 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 20ed183..c4c86a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -120,198 +120,192 @@ public abstract class ClientScanner extends AbstractClientScanner {
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Scan table=" + tableName
- + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
- }
- this.scan = scan;
- this.tableName = tableName;
- this.lastNext = System.currentTimeMillis();
- this.connection = connection;
- this.pool = pool;
- this.primaryOperationTimeout = primaryOperationTimeout;
- this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- if (scan.getMaxResultSize() > 0) {
- this.maxScannerResultSize = scan.getMaxResultSize();
- } else {
- this.maxScannerResultSize = conf.getLong(
- HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
- }
- this.scannerTimeout = HBaseConfiguration.getInt(conf,
- HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
-
- // check if application wants to collect scan metrics
- initScanMetrics(scan);
-
- // Use the caching from the Scan. If not set, use the default cache setting for this table.
- if (this.scan.getCaching() > 0) {
- this.caching = this.scan.getCaching();
- } else {
- this.caching = conf.getInt(
- HConstants.HBASE_CLIENT_SCANNER_CACHING,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
- }
-
- this.caller = rpcFactory.<Result[]> newCaller();
- this.rpcControllerFactory = controllerFactory;
-
- this.conf = conf;
- initCache();
- initializeScannerInConstruction();
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
+ }
+ this.scan = scan;
+ this.tableName = tableName;
+ this.lastNext = System.currentTimeMillis();
+ this.connection = connection;
+ this.pool = pool;
+ this.primaryOperationTimeout = primaryOperationTimeout;
+ this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ if (scan.getMaxResultSize() > 0) {
+ this.maxScannerResultSize = scan.getMaxResultSize();
+ } else {
+ this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+ }
+ this.scannerTimeout =
+ HBaseConfiguration.getInt(conf, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+
+ // check if application wants to collect scan metrics
+ initScanMetrics(scan);
+
+ // Use the caching from the Scan. If not set, use the default cache setting for this table.
+ if (this.scan.getCaching() > 0) {
+ this.caching = this.scan.getCaching();
+ } else {
+ this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ }
+
+ this.caller = rpcFactory.<Result[]> newCaller();
+ this.rpcControllerFactory = controllerFactory;
+
+ this.conf = conf;
+ initCache();
+ initializeScannerInConstruction();
+ }
- protected abstract void initCache();
+ protected abstract void initCache();
- protected void initializeScannerInConstruction() throws IOException{
- // initialize the scanner
- nextScanner(this.caching, false);
- }
+ protected void initializeScannerInConstruction() throws IOException {
+ // initialize the scanner
+ nextScanner(this.caching, false);
+ }
- protected ClusterConnection getConnection() {
- return this.connection;
- }
+ protected ClusterConnection getConnection() {
+ return this.connection;
+ }
- protected TableName getTable() {
- return this.tableName;
- }
+ protected TableName getTable() {
+ return this.tableName;
+ }
- protected int getRetries() {
- return this.retries;
- }
+ protected int getRetries() {
+ return this.retries;
+ }
- protected int getScannerTimeout() {
- return this.scannerTimeout;
- }
+ protected int getScannerTimeout() {
+ return this.scannerTimeout;
+ }
- protected Configuration getConf() {
- return this.conf;
- }
+ protected Configuration getConf() {
+ return this.conf;
+ }
- protected Scan getScan() {
- return scan;
- }
+ protected Scan getScan() {
+ return scan;
+ }
- protected ExecutorService getPool() {
- return pool;
- }
+ protected ExecutorService getPool() {
+ return pool;
+ }
- protected int getPrimaryOperationTimeout() {
- return primaryOperationTimeout;
- }
+ protected int getPrimaryOperationTimeout() {
+ return primaryOperationTimeout;
+ }
- protected int getCaching() {
- return caching;
- }
+ protected int getCaching() {
+ return caching;
+ }
- protected long getTimestamp() {
- return lastNext;
- }
+ protected long getTimestamp() {
+ return lastNext;
+ }
- @VisibleForTesting
- protected long getMaxResultSize() {
- return maxScannerResultSize;
- }
+ @VisibleForTesting
+ protected long getMaxResultSize() {
+ return maxScannerResultSize;
+ }
- // returns true if the passed region endKey
- protected boolean checkScanStopRow(final byte [] endKey) {
- if (this.scan.getStopRow().length > 0) {
- // there is a stop row, check to see if we are past it.
- byte [] stopRow = scan.getStopRow();
- int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
- endKey, 0, endKey.length);
- if (cmp <= 0) {
- // stopRow <= endKey (endKey is equals to or larger than stopRow)
- // This is a stop.
- return true;
- }
+ // returns true if the passed region endKey
+ protected boolean checkScanStopRow(final byte[] endKey) {
+ if (this.scan.getStopRow().length > 0) {
+ // there is a stop row, check to see if we are past it.
+ byte[] stopRow = scan.getStopRow();
+ int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length);
+ if (cmp <= 0) {
+ // stopRow <= endKey (endKey is equals to or larger than stopRow)
+ // This is a stop.
+ return true;
}
- return false; //unlikely.
- }
-
- private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
- // If we have just switched replica, don't go to the next scanner yet. Rather, try
- // the scanner operations on the new replica, from the right point in the scan
- // Note that when we switched to a different replica we left it at a point
- // where we just did the "openScanner" with the appropriate startrow
- if (callable != null && callable.switchedToADifferentReplica()) return true;
- return nextScanner(nbRows, done);
}
+ return false; // unlikely.
+ }
- /*
- * Gets a scanner for the next region. If this.currentRegion != null, then
- * we will move to the endrow of this.currentRegion. Else we will get
- * scanner at the scan.getStartRow(). We will go no further, just tidy
- * up outstanding scanners, if <code>currentRegion != null</code> and
- * <code>done</code> is true.
- * @param nbRows
- * @param done Server-side says we're done scanning.
- */
- protected boolean nextScanner(int nbRows, final boolean done)
- throws IOException {
- // Close the previous scanner if it's open
- if (this.callable != null) {
- this.callable.setClose();
- call(callable, caller, scannerTimeout);
- this.callable = null;
- }
+ private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
+ // If we have just switched replica, don't go to the next scanner yet. Rather, try
+ // the scanner operations on the new replica, from the right point in the scan
+ // Note that when we switched to a different replica we left it at a point
+ // where we just did the "openScanner" with the appropriate startrow
+ if (callable != null && callable.switchedToADifferentReplica()) return true;
+ return nextScanner(nbRows, done);
+ }
- // Where to start the next scanner
- byte [] localStartKey;
-
- // if we're at end of table, close and return false to stop iterating
- if (this.currentRegion != null) {
- byte [] endKey = this.currentRegion.getEndKey();
- if (endKey == null ||
- Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
- checkScanStopRow(endKey) ||
- done) {
- close();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Finished " + this.currentRegion);
- }
- return false;
- }
- localStartKey = endKey;
+ /*
+ * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
+ * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
+ * further, just tidy up outstanding scanners, if <code>currentRegion != null</code> and
+ * <code>done</code> is true.
+ * @param nbRows
+ * @param done Server-side says we're done scanning.
+ */
+ protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
+ // Close the previous scanner if it's open
+ if (this.callable != null) {
+ this.callable.setClose();
+ call(callable, caller, scannerTimeout);
+ this.callable = null;
+ }
+
+ // Where to start the next scanner
+ byte[] localStartKey;
+
+ // if we're at end of table, close and return false to stop iterating
+ if (this.currentRegion != null) {
+ byte[] endKey = this.currentRegion.getEndKey();
+ if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
+ || checkScanStopRow(endKey) || done) {
+ close();
if (LOG.isTraceEnabled()) {
LOG.trace("Finished " + this.currentRegion);
}
- } else {
- localStartKey = this.scan.getStartRow();
+ return false;
}
-
- if (LOG.isDebugEnabled() && this.currentRegion != null) {
- // Only worth logging if NOT first region in scan.
- LOG.debug("Advancing internal scanner to startKey at '" +
- Bytes.toStringBinary(localStartKey) + "'");
+ localStartKey = endKey;
+ // clear mvcc read point if we are going to switch regions
+ scan.resetMvccReadPoint();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Finished " + this.currentRegion);
}
- try {
- callable = getScannerCallable(localStartKey, nbRows);
- // Open a scanner on the region server starting at the
- // beginning of the region
- call(callable, caller, scannerTimeout);
- this.currentRegion = callable.getHRegionInfo();
- if (this.scanMetrics != null) {
- this.scanMetrics.countOfRegions.incrementAndGet();
- }
- } catch (IOException e) {
- close();
- throw e;
+ } else {
+ localStartKey = this.scan.getStartRow();
+ }
+
+ if (LOG.isDebugEnabled() && this.currentRegion != null) {
+ // Only worth logging if NOT first region in scan.
+ LOG.debug(
+ "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'");
+ }
+ try {
+ callable = getScannerCallable(localStartKey, nbRows);
+ // Open a scanner on the region server starting at the
+ // beginning of the region
+ call(callable, caller, scannerTimeout);
+ this.currentRegion = callable.getHRegionInfo();
+ if (this.scanMetrics != null) {
+ this.scanMetrics.countOfRegions.incrementAndGet();
}
- return true;
+ } catch (IOException e) {
+ close();
+ throw e;
}
+ return true;
+ }
@VisibleForTesting
boolean isAnyRPCcancelled() {
return callable.isAnyRPCcancelled();
}
- Result[] call(ScannerCallableWithReplicas callable,
- RpcRetryingCaller<Result[]> caller, int scannerTimeout)
- throws IOException, RuntimeException {
+ Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
+ int scannerTimeout) throws IOException, RuntimeException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
@@ -320,61 +314,57 @@ public abstract class ClientScanner extends AbstractClientScanner {
return caller.callWithoutRetries(callable, scannerTimeout);
}
- @InterfaceAudience.Private
- protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
- int nbRows) {
- scan.setStartRow(localStartKey);
- ScannerCallable s =
- new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
- this.rpcControllerFactory);
- s.setCaching(nbRows);
- ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
- s, pool, primaryOperationTimeout, scan,
- retries, scannerTimeout, caching, conf, caller);
- return sr;
- }
+ @InterfaceAudience.Private
+ protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) {
+ scan.setStartRow(localStartKey);
+ ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
+ this.rpcControllerFactory);
+ s.setCaching(nbRows);
+ ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s,
+ pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller);
+ return sr;
+ }
- /**
- * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
- * application or TableInputFormat.Later, we could push it to other systems. We don't use
- * metrics framework because it doesn't support multi-instances of the same metrics on the same
- * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
- *
- * By default, scan metrics are disabled; if the application wants to collect them, this
- * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
- *
- * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
- */
- protected void writeScanMetrics() {
- if (this.scanMetrics == null || scanMetricsPublished) {
- return;
- }
- MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
- scanMetricsPublished = true;
+ /**
+ * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
+ * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
+ * framework because it doesn't support multi-instances of the same metrics on the same machine;
+ * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
+ * default, scan metrics are disabled; if the application wants to collect them, this behavior can
+ * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
+ * <p>
+ * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
+ */
+ protected void writeScanMetrics() {
+ if (this.scanMetrics == null || scanMetricsPublished) {
+ return;
}
+ MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
+ scanMetricsPublished = true;
+ }
- protected void initSyncCache() {
+ protected void initSyncCache() {
cache = new LinkedList<Result>();
}
- protected Result nextWithSyncCache() throws IOException {
- // If the scanner is closed and there's nothing left in the cache, next is a no-op.
- if (cache.size() == 0 && this.closed) {
- return null;
- }
- if (cache.size() == 0) {
- loadCache();
- }
-
- if (cache.size() > 0) {
- return cache.poll();
- }
-
- // if we exhausted this scanner before calling close, write out the scan metrics
- writeScanMetrics();
+ protected Result nextWithSyncCache() throws IOException {
+ // If the scanner is closed and there's nothing left in the cache, next is a no-op.
+ if (cache.size() == 0 && this.closed) {
return null;
}
+ if (cache.size() == 0) {
+ loadCache();
+ }
+
+ if (cache.size() > 0) {
+ return cache.poll();
+ }
+
+ // if we exhausted this scanner before calling close, write out the scan metrics
+ writeScanMetrics();
+ return null;
+ }
@VisibleForTesting
public int getCacheSize() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index b2c012d..c56132c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -334,7 +334,7 @@ public class HTable implements Table {
* {@link Table#getScanner(Scan)} has other usage details.
*/
@Override
- public ResultScanner getScanner(final Scan scan) throws IOException {
+ public ResultScanner getScanner(Scan scan) throws IOException {
if (scan.getBatch() > 0 && scan.isSmall()) {
throw new IllegalArgumentException("Small scan should not be used with batching");
}
@@ -345,7 +345,10 @@ public class HTable implements Table {
if (scan.getMaxResultSize() <= 0) {
scan.setMaxResultSize(scannerMaxResultSize);
}
-
+ if (scan.getMvccReadPoint() > 0) {
+ // it is not supposed to be set by user, clear
+ scan.resetMvccReadPoint();
+ }
Boolean async = scan.isAsyncPrefetch();
if (async == null) {
async = connConfiguration.isClientScannerAsyncPrefetch();
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java
new file mode 100644
index 0000000..6a3ac18
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A helper class used to access the package private field in o.a.h.h.client package.
+ * <p>
+ * This is because we share some data structures between client and server and the data structures
+ * are marked as {@code InterfaceAudience.Public}, but we do not want to expose some of the fields
+ * to end user.
+ * <p>
+ * TODO: A better solution is to separate the data structures used in client and server.
+ */
+@InterfaceAudience.Private
+public class PackagePrivateFieldAccessor {
+
+ public static void setMvccReadPoint(Scan scan, long mvccReadPoint) {
+ scan.setMvccReadPoint(mvccReadPoint);
+ }
+
+ public static long getMvccReadPoint(Scan scan) {
+ return scan.getMvccReadPoint();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index b0d361c..9d659b8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -154,27 +154,24 @@ public class Scan extends Query {
*/
public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false;
- /**
- * Set it true for small scan to get better performance
- *
- * Small scan should use pread and big scan can use seek + read
- *
- * seek + read is fast but can cause two problem (1) resource contention (2)
- * cause too much network io
- *
- * [89-fb] Using pread for non-compaction read request
- * https://issues.apache.org/jira/browse/HBASE-7266
- *
- * On the other hand, if setting it true, we would do
- * openScanner,next,closeScanner in one RPC call. It means the better
- * performance for small scan. [HBASE-9488].
- *
- * Generally, if the scan range is within one data block(64KB), it could be
- * considered as a small scan.
+ /**
+ * Set it true for small scan to get better performance Small scan should use pread and big scan
+ * can use seek + read seek + read is fast but can cause two problem (1) resource contention (2)
+ * cause too much network io [89-fb] Using pread for non-compaction read request
+ * https://issues.apache.org/jira/browse/HBASE-7266 On the other hand, if setting it true, we
+ * would do openScanner,next,closeScanner in one RPC call. It means the better performance for
+ * small scan. [HBASE-9488]. Generally, if the scan range is within one data block(64KB), it could
+ * be considered as a small scan.
*/
private boolean small = false;
/**
+ * The mvcc read point to use when open a scanner. Remember to clear it after switching regions as
+ * the mvcc is only valid within region scope.
+ */
+ private long mvccReadPoint = -1L;
+
+ /**
* Create a Scan operation across all rows.
*/
public Scan() {}
@@ -253,6 +250,7 @@ public class Scan extends Query {
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ this.mvccReadPoint = scan.getMvccReadPoint();
}
/**
@@ -281,6 +279,7 @@ public class Scan extends Query {
TimeRange tr = entry.getValue();
setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
}
+ this.mvccReadPoint = -1L;
}
public boolean isGetScan() {
@@ -976,4 +975,26 @@ public class Scan extends Query {
this.asyncPrefetch = asyncPrefetch;
return this;
}
+
+ /**
+ * Get the mvcc read point used to open a scanner.
+ */
+ long getMvccReadPoint() {
+ return mvccReadPoint;
+ }
+
+ /**
+ * Set the mvcc read point used to open a scanner.
+ */
+ Scan setMvccReadPoint(long mvccReadPoint) {
+ this.mvccReadPoint = mvccReadPoint;
+ return this;
+ }
+
+ /**
+ * Set the mvcc read point to -1 which means do not use it.
+ */
+ Scan resetMvccReadPoint() {
+ return setMvccReadPoint(-1L);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 0351e54..7a22648 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -375,6 +375,9 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
+ " on region " + getLocation().toString());
}
+ if (response.hasMvccReadPoint()) {
+ this.scan.setMvccReadPoint(response.getMvccReadPoint());
+ }
return id;
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 330348d..c52d413 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -907,6 +908,10 @@ public final class ProtobufUtil {
if (scan.getCaching() > 0) {
scanBuilder.setCaching(scan.getCaching());
}
+ long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
+ if (mvccReadPoint > 0) {
+ scanBuilder.setMvccReadPoint(mvccReadPoint);
+ }
return scanBuilder.build();
}
@@ -994,6 +999,9 @@ public final class ProtobufUtil {
if (proto.hasCaching()) {
scan.setCaching(proto.getCaching());
}
+ if (proto.hasMvccReadPoint()) {
+ PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
+ }
return scan;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 7d1770e..5876fae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.Result;
@@ -1019,6 +1020,10 @@ public final class ProtobufUtil {
if (scan.getCaching() > 0) {
scanBuilder.setCaching(scan.getCaching());
}
+ long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
+ if (mvccReadPoint > 0) {
+ scanBuilder.setMvccReadPoint(mvccReadPoint);
+ }
return scanBuilder.build();
}
@@ -1106,6 +1111,9 @@ public final class ProtobufUtil {
if (proto.hasCaching()) {
scan.setCaching(proto.getCaching());
}
+ if (proto.hasMvccReadPoint()) {
+ PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
+ }
return scan;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
index bfd196e..e9458df 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java
@@ -14554,6 +14554,15 @@ public final class ClientProtos {
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRangeOrBuilder getCfTimeRangeOrBuilder(
int index);
+
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ boolean hasMvccReadPoint();
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ long getMvccReadPoint();
}
/**
* <pre>
@@ -14594,6 +14603,7 @@ public final class ClientProtos {
caching_ = 0;
allowPartialResults_ = false;
cfTimeRange_ = java.util.Collections.emptyList();
+ mvccReadPoint_ = 0L;
}
@java.lang.Override
@@ -14753,6 +14763,11 @@ public final class ClientProtos {
input.readMessage(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRange.PARSER, extensionRegistry));
break;
}
+ case 160: {
+ bitField0_ |= 0x00010000;
+ mvccReadPoint_ = input.readUInt64();
+ break;
+ }
}
}
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -15153,6 +15168,21 @@ public final class ClientProtos {
return cfTimeRange_.get(index);
}
+ public static final int MVCC_READ_POINT_FIELD_NUMBER = 20;
+ private long mvccReadPoint_;
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public boolean hasMvccReadPoint() {
+ return ((bitField0_ & 0x00010000) == 0x00010000);
+ }
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public long getMvccReadPoint() {
+ return mvccReadPoint_;
+ }
+
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
@@ -15246,6 +15276,9 @@ public final class ClientProtos {
for (int i = 0; i < cfTimeRange_.size(); i++) {
output.writeMessage(19, cfTimeRange_.get(i));
}
+ if (((bitField0_ & 0x00010000) == 0x00010000)) {
+ output.writeUInt64(20, mvccReadPoint_);
+ }
unknownFields.writeTo(output);
}
@@ -15330,6 +15363,10 @@ public final class ClientProtos {
size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
.computeMessageSize(19, cfTimeRange_.get(i));
}
+ if (((bitField0_ & 0x00010000) == 0x00010000)) {
+ size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(20, mvccReadPoint_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -15432,6 +15469,11 @@ public final class ClientProtos {
}
result = result && getCfTimeRangeList()
.equals(other.getCfTimeRangeList());
+ result = result && (hasMvccReadPoint() == other.hasMvccReadPoint());
+ if (hasMvccReadPoint()) {
+ result = result && (getMvccReadPoint()
+ == other.getMvccReadPoint());
+ }
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@@ -15525,6 +15567,11 @@ public final class ClientProtos {
hash = (37 * hash) + CF_TIME_RANGE_FIELD_NUMBER;
hash = (53 * hash) + getCfTimeRangeList().hashCode();
}
+ if (hasMvccReadPoint()) {
+ hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER;
+ hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+ getMvccReadPoint());
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -15716,6 +15763,8 @@ public final class ClientProtos {
} else {
cfTimeRangeBuilder_.clear();
}
+ mvccReadPoint_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00080000);
return this;
}
@@ -15839,6 +15888,10 @@ public final class ClientProtos {
} else {
result.cfTimeRange_ = cfTimeRangeBuilder_.build();
}
+ if (((from_bitField0_ & 0x00080000) == 0x00080000)) {
+ to_bitField0_ |= 0x00010000;
+ }
+ result.mvccReadPoint_ = mvccReadPoint_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -16007,6 +16060,9 @@ public final class ClientProtos {
}
}
}
+ if (other.hasMvccReadPoint()) {
+ setMvccReadPoint(other.getMvccReadPoint());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -17484,6 +17540,38 @@ public final class ClientProtos {
}
return cfTimeRangeBuilder_;
}
+
+ private long mvccReadPoint_ ;
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public boolean hasMvccReadPoint() {
+ return ((bitField0_ & 0x00080000) == 0x00080000);
+ }
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public long getMvccReadPoint() {
+ return mvccReadPoint_;
+ }
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public Builder setMvccReadPoint(long value) {
+ bitField0_ |= 0x00080000;
+ mvccReadPoint_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code>
+ */
+ public Builder clearMvccReadPoint() {
+ bitField0_ = (bitField0_ & ~0x00080000);
+ mvccReadPoint_ = 0L;
+ onChanged();
+ return this;
+ }
public final Builder setUnknownFields(
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
@@ -19311,6 +19399,27 @@ public final class ClientProtos {
* <code>optional .hbase.pb.ScanMetrics scan_metrics = 10;</code>
*/
org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder();
+
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ boolean hasMvccReadPoint();
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ long getMvccReadPoint();
}
/**
* <pre>
@@ -19339,6 +19448,7 @@ public final class ClientProtos {
partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false;
heartbeatMessage_ = false;
+ mvccReadPoint_ = 0L;
}
@java.lang.Override
@@ -19463,6 +19573,11 @@ public final class ClientProtos {
bitField0_ |= 0x00000040;
break;
}
+ case 88: {
+ bitField0_ |= 0x00000080;
+ mvccReadPoint_ = input.readUInt64();
+ break;
+ }
}
}
} catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -19821,6 +19936,33 @@ public final class ClientProtos {
return scanMetrics_ == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance() : scanMetrics_;
}
+ public static final int MVCC_READ_POINT_FIELD_NUMBER = 11;
+ private long mvccReadPoint_;
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public boolean hasMvccReadPoint() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public long getMvccReadPoint() {
+ return mvccReadPoint_;
+ }
+
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
@@ -19863,6 +20005,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeMessage(10, getScanMetrics());
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeUInt64(11, mvccReadPoint_);
+ }
unknownFields.writeTo(output);
}
@@ -19918,6 +20063,10 @@ public final class ClientProtos {
size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
.computeMessageSize(10, getScanMetrics());
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(11, mvccReadPoint_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -19976,6 +20125,11 @@ public final class ClientProtos {
result = result && getScanMetrics()
.equals(other.getScanMetrics());
}
+ result = result && (hasMvccReadPoint() == other.hasMvccReadPoint());
+ if (hasMvccReadPoint()) {
+ result = result && (getMvccReadPoint()
+ == other.getMvccReadPoint());
+ }
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@@ -20032,6 +20186,11 @@ public final class ClientProtos {
hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER;
hash = (53 * hash) + getScanMetrics().hashCode();
}
+ if (hasMvccReadPoint()) {
+ hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER;
+ hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong(
+ getMvccReadPoint());
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -20186,6 +20345,8 @@ public final class ClientProtos {
scanMetricsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
+ mvccReadPoint_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -20261,6 +20422,10 @@ public final class ClientProtos {
} else {
result.scanMetrics_ = scanMetricsBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.mvccReadPoint_ = mvccReadPoint_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -20370,6 +20535,9 @@ public final class ClientProtos {
if (other.hasScanMetrics()) {
mergeScanMetrics(other.getScanMetrics());
}
+ if (other.hasMvccReadPoint()) {
+ setMvccReadPoint(other.getMvccReadPoint());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -21433,6 +21601,62 @@ public final class ClientProtos {
}
return scanMetricsBuilder_;
}
+
+ private long mvccReadPoint_ ;
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public boolean hasMvccReadPoint() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public long getMvccReadPoint() {
+ return mvccReadPoint_;
+ }
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public Builder setMvccReadPoint(long value) {
+ bitField0_ |= 0x00000400;
+ mvccReadPoint_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <pre>
+ * The mvcc read point which is used to open the scanner at server side. Client can
+ * make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ * of a row.
+ * </pre>
+ *
+ * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code>
+ */
+ public Builder clearMvccReadPoint() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ mvccReadPoint_ = 0L;
+ onChanged();
+ return this;
+ }
public final Builder setUnknownFields(
final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
@@ -40434,7 +40658,7 @@ public final class ClientProtos {
"tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" +
"_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" +
"\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " +
- "\001(\010\"\275\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
+ "\001(\010\"\331\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." +
"Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" +
"eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" +
"w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" +
@@ -40448,96 +40672,98 @@ public final class ClientProtos {
" \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" +
"aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " +
"\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" +
- "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r",
- "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034" +
- "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" +
- "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" +
- "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" +
- "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" +
- "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" +
- "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" +
- "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" +
- "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" +
- "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.",
- "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" +
- "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" +
- "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" +
- "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" +
- "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
- "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" +
- "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" +
- "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" +
- "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" +
- "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(",
- "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014" +
- "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" +
- "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" +
- "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" +
- " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" +
- "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" +
- "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" +
- "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" +
- "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" +
- "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013",
- "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu" +
- "lkLoadResponse\"a\n\026CoprocessorServiceCall" +
- "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" +
- "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" +
- "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" +
- "base.pb.NameBytesPair\"v\n\031CoprocessorServ" +
- "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" +
- "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" +
- "oprocessorServiceCall\"o\n\032CoprocessorServ" +
- "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R",
- "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb" +
- ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" +
- "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" +
- "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" +
- "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" +
- "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" +
- "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" +
- "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" +
- "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" +
- "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction",
- "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat" +
- "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" +
- "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" +
- "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
- "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" +
- "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" +
- "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" +
- "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" +
- "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" +
- "ActionResult\0226\n\021resultOrException\030\001 \003(\0132",
- "\033.hbase.pb.ResultOrException\022*\n\texceptio" +
- "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" +
- "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" +
- "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" +
- "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" +
- "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" +
- "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" +
- "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" +
- "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" +
- "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS",
- "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb" +
- "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." +
- "MutateRequest\032\030.hbase.pb.MutateResponse\022" +
- "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" +
- "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." +
- "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" +
- "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" +
- "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" +
- ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" +
- "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!.",
- "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec" +
- "Service\022#.hbase.pb.CoprocessorServiceReq" +
- "uest\032$.hbase.pb.CoprocessorServiceRespon" +
- "se\022d\n\027ExecRegionServerService\022#.hbase.pb" +
- ".CoprocessorServiceRequest\032$.hbase.pb.Co" +
- "processorServiceResponse\0228\n\005Multi\022\026.hbas" +
- "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" +
- "seBI\n1org.apache.hadoop.hbase.shaded.pro" +
- "tobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+ "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024",
+ " \001(\004:\0010\"\246\002\n\013ScanRequest\022)\n\006region\030\001 \001(\0132" +
+ "\031.hbase.pb.RegionSpecifier\022\034\n\004scan\030\002 \001(\013" +
+ "2\016.hbase.pb.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016" +
+ "number_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 " +
+ "\001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_han" +
+ "dles_partials\030\007 \001(\010\022!\n\031client_handles_he" +
+ "artbeats\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001" +
+ "(\010\022\024\n\005renew\030\n \001(\010:\005false\"\266\002\n\014ScanRespons" +
+ "e\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_i" +
+ "d\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001",
+ "(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n" +
+ "\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result\030" +
+ "\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n" +
+ "\021heartbeat_message\030\t \001(\010\022+\n\014scan_metrics" +
+ "\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_re" +
+ "ad_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileReque" +
+ "st\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpec" +
+ "ifier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bu" +
+ "lkLoadHFileRequest.FamilyPath\022\026\n\016assign_" +
+ "seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.",
+ "pb.DelegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030" +
+ "\n\tcopy_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016" +
+ "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" +
+ "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegat" +
+ "ionToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password" +
+ "\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n" +
+ "\026PrepareBulkLoadRequest\022\'\n\ntable_name\030\001 " +
+ "\002(\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\013" +
+ "2\031.hbase.pb.RegionSpecifier\"-\n\027PrepareBu" +
+ "lkLoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cl",
+ "eanupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t" +
+ "\022)\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecif" +
+ "ier\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproc" +
+ "essorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service" +
+ "_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007requ" +
+ "est\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n" +
+ "\005value\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n" +
+ "\031CoprocessorServiceRequest\022)\n\006region\030\001 \002" +
+ "(\0132\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 " +
+ "\002(\0132 .hbase.pb.CoprocessorServiceCall\"o\n",
+ "\032CoprocessorServiceResponse\022)\n\006region\030\001 " +
+ "\002(\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030" +
+ "\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Actio" +
+ "n\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hba" +
+ "se.pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase" +
+ ".pb.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb" +
+ ".CoprocessorServiceCall\"k\n\014RegionAction\022" +
+ ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" +
+ "er\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hba" +
+ "se.pb.Action\"c\n\017RegionLoadStats\022\027\n\014memst",
+ "oreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:" +
+ "\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Mul" +
+ "tiRegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbas" +
+ "e.pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hba" +
+ "se.pb.RegionLoadStats\"\336\001\n\021ResultOrExcept" +
+ "ion\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hba" +
+ "se.pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase." +
+ "pb.NameBytesPair\022:\n\016service_result\030\004 \001(\013" +
+ "2\".hbase.pb.CoprocessorServiceResult\0220\n\t" +
+ "loadStats\030\005 \001(\0132\031.hbase.pb.RegionLoadSta",
+ "tsB\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOr" +
+ "Exception\030\001 \003(\0132\033.hbase.pb.ResultOrExcep" +
+ "tion\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameB" +
+ "ytesPair\"x\n\014MultiRequest\022,\n\014regionAction" +
+ "\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceG" +
+ "roup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb" +
+ ".Condition\"\226\001\n\rMultiResponse\0228\n\022regionAc" +
+ "tionResult\030\001 \003(\0132\034.hbase.pb.RegionAction" +
+ "Result\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStati" +
+ "stics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadSt",
+ "ats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" +
+ "NE\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb" +
+ ".GetRequest\032\025.hbase.pb.GetResponse\022;\n\006Mu" +
+ "tate\022\027.hbase.pb.MutateRequest\032\030.hbase.pb" +
+ ".MutateResponse\0225\n\004Scan\022\025.hbase.pb.ScanR" +
+ "equest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLo" +
+ "adHFile\022\036.hbase.pb.BulkLoadHFileRequest\032" +
+ "\037.hbase.pb.BulkLoadHFileResponse\022V\n\017Prep" +
+ "areBulkLoad\022 .hbase.pb.PrepareBulkLoadRe" +
+ "quest\032!.hbase.pb.PrepareBulkLoadResponse",
+ "\022V\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBu" +
+ "lkLoadRequest\032!.hbase.pb.CleanupBulkLoad" +
+ "Response\022X\n\013ExecService\022#.hbase.pb.Copro" +
+ "cessorServiceRequest\032$.hbase.pb.Coproces" +
+ "sorServiceResponse\022d\n\027ExecRegionServerSe" +
+ "rvice\022#.hbase.pb.CoprocessorServiceReque" +
+ "st\032$.hbase.pb.CoprocessorServiceResponse" +
+ "\0228\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbas" +
+ "e.pb.MultiResponseBI\n1org.apache.hadoop." +
+ "hbase.shaded.protobuf.generatedB\014ClientP",
+ "rotosH\001\210\001\001\240\001\001"
};
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
@@ -40639,7 +40865,7 @@ public final class ClientProtos {
internal_static_hbase_pb_Scan_fieldAccessorTable = new
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_hbase_pb_Scan_descriptor,
- new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", });
+ new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", });
internal_static_hbase_pb_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new
@@ -40651,7 +40877,7 @@ public final class ClientProtos {
internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new
org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_hbase_pb_ScanResponse_descriptor,
- new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", });
+ new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", "MvccReadPoint", });
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-protocol-shaded/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 2feaa26..9a7fea2 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -255,6 +255,7 @@ message Scan {
optional uint32 caching = 17;
optional bool allow_partial_results = 18;
repeated ColumnFamilyTimeRange cf_time_range = 19;
+ optional uint64 mvcc_read_point = 20 [default = 0];
}
/**
@@ -317,17 +318,22 @@ message ScanResponse {
// reasons such as the size in bytes or quantity of results accumulated. This field
// will true when more results exist in the current region.
optional bool more_results_in_region = 8;
-
+
// This field is filled in if the server is sending back a heartbeat message.
// Heartbeat messages are sent back to the client to prevent the scanner from
// timing out. Seeing a heartbeat message communicates to the Client that the
// server would have continued to scan had the time limit not been reached.
optional bool heartbeat_message = 9;
-
+
// This field is filled in if the client has requested that scan metrics be tracked.
- // The metrics tracked here are sent back to the client to be tracked together with
+ // The metrics tracked here are sent back to the client to be tracked together with
// the existing client side metrics.
optional ScanMetrics scan_metrics = 10;
+
+ // The mvcc read point which is used to open the scanner at server side. Client can
+ // make use of this mvcc_read_point when restarting a scanner to get a consistent view
+ // of a row.
+ optional uint64 mvcc_read_point = 11 [default = 0];
}
/**