You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/04/18 00:42:56 UTC
[2/2] hbase git commit: HBASE-13090 Progress heartbeats for long
running scanners
HBASE-13090 Progress heartbeats for long running scanners
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/abe3796a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/abe3796a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/abe3796a
Branch: refs/heads/master
Commit: abe3796a9907485c875932caa5f1c82071495c0f
Parents: 2c5dc53
Author: Jonathan Lawlor <jo...@cloudera.com>
Authored: Tue Mar 10 14:24:07 2015 -0700
Committer: stack <st...@apache.org>
Committed: Fri Apr 17 15:42:46 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 49 +-
.../hadoop/hbase/client/ScannerCallable.java | 23 +
.../client/ScannerCallableWithReplicas.java | 10 +
.../hadoop/hbase/protobuf/RequestConverter.java | 3 +
.../src/main/resources/hbase-default.xml | 12 +
.../hbase/protobuf/generated/ClientProtos.java | 349 ++++++++++--
hbase-protocol/src/main/protobuf/Client.proto | 7 +
.../hbase/client/ClientSideRegionScanner.java | 4 +-
.../hadoop/hbase/regionserver/HRegion.java | 59 +-
.../hadoop/hbase/regionserver/KeyValueHeap.java | 2 +
.../regionserver/NoLimitScannerContext.java | 28 +-
.../hbase/regionserver/RSRpcServices.java | 94 +++-
.../hbase/regionserver/ScannerContext.java | 124 ++++-
.../hadoop/hbase/regionserver/StoreScanner.java | 31 ++
.../hbase/TestPartialResultsFromClientSide.java | 3 +-
.../coprocessor/TestCoprocessorInterface.java | 1 -
.../TestScannerHeartbeatMessages.java | 538 +++++++++++++++++++
.../compactions/TestStripeCompactionPolicy.java | 1 -
18 files changed, 1234 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index ccd8c2d..6a129c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -393,7 +393,6 @@ public class ClientScanner extends AbstractClientScanner {
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = call(callable, caller, scannerTimeout);
-
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
@@ -482,7 +481,8 @@ public class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
- List<Result> resultsToAddToCache = getResultsToAddToCache(values);
+ List<Result> resultsToAddToCache =
+ getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);
@@ -494,6 +494,19 @@ public class ClientScanner extends AbstractClientScanner {
this.lastResult = rs;
}
}
+
+ // Caller of this method just wants a Result. If we see a heartbeat message, it means
+ // processing of the scan is taking a long time server side. Rather than continue to
+ // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
+ // unnecesary delays to the caller
+ if (callable.isHeartbeatMessage() && cache.size() > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Heartbeat message received and cache contains Results."
+ + " Breaking out of scan loop");
+ }
+ break;
+ }
+
// We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
@@ -507,20 +520,38 @@ public class ClientScanner extends AbstractClientScanner {
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
- } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
+ } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
}
/**
+ * @param remainingResultSize
+ * @param remainingRows
+ * @param regionHasMoreResults
+ * @return true when the current region has been exhausted. When the current region has been
+ * exhausted, the region must be changed before scanning can continue
+ */
+ private boolean doneWithRegion(long remainingResultSize, int remainingRows,
+ boolean regionHasMoreResults) {
+ return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
+ }
+
+ /**
* This method ensures all of our book keeping regarding partial results is kept up to date. This
* method should be called once we know that the results we received back from the RPC request do
* not contain errors. We return a list of results that should be added to the cache. In general,
* this list will contain all NON-partial results from the input array (unless the client has
* specified that they are okay with receiving partial results)
+ * @param resultsFromServer The array of {@link Result}s returned from the server
+ * @param heartbeatMessage Flag indicating whether or not the response received from the server
+ * represented a complete response, or a heartbeat message that was sent to keep the
+ * client-server connection alive
* @return the list of results that should be added to the cache.
* @throws IOException
*/
- protected List<Result> getResultsToAddToCache(Result[] resultsFromServer) throws IOException {
+ protected List<Result>
+ getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
+ throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
@@ -538,10 +569,14 @@ public class ClientScanner extends AbstractClientScanner {
return resultsToAddToCache;
}
- // If no results were returned it indicates that we have the all the partial results necessary
- // to construct the complete result.
+ // If no results were returned it indicates that either we have the all the partial results
+ // necessary to construct the complete result or the server had to send a heartbeat message
+ // to the client to keep the client-server connection alive
if (resultsFromServer == null || resultsFromServer.length == 0) {
- if (!partialResults.isEmpty()) {
+ // If this response was an empty heartbeat message, then we have not exhausted the region
+ // and thus there may be more partials server side that still need to be added to the partial
+ // list before we form the complete Result
+ if (!partialResults.isEmpty() && !heartbeatMessage) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 714c9fe..b53c9d6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -78,6 +78,12 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected final int id;
protected boolean serverHasMoreResultsContext;
protected boolean serverHasMoreResults;
+
+ /**
+ * Saves whether or not the most recent response from the server was a heartbeat message.
+ * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
+ */
+ protected boolean heartbeatMessage = false;
static {
try {
myAddress = DNS.getDefaultHost("default", "default");
@@ -191,6 +197,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} else {
Result [] rrs = null;
ScanRequest request = null;
+ // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
+ setHeartbeatMessage(false);
try {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
@@ -211,6 +219,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// See HBASE-5974
nextCallSeq++;
long timestamp = System.currentTimeMillis();
+ setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
// Results are returned via controller
CellScanner cellScanner = controller.cellScanner();
rrs = ResponseConverter.getResults(cellScanner, response);
@@ -291,6 +300,20 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return null;
}
+ /**
+ * @return true when the most recent RPC response indicated that the response was a heartbeat
+ * message. Heartbeat messages are sent back from the server when the processing of the
+ * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
+ * timeouts during long running scan operations.
+ */
+ protected boolean isHeartbeatMessage() {
+ return heartbeatMessage;
+ }
+
+ protected void setHeartbeatMessage(boolean heartbeatMessage) {
+ this.heartbeatMessage = heartbeatMessage;
+ }
+
private void incRPCcallsMetrics() {
if (this.scanMetrics == null) {
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 7ba152b..d672852 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -273,6 +273,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return replicaSwitched.get();
}
+ /**
+ * @return true when the most recent RPC response indicated that the response was a heartbeat
+ * message. Heartbeat messages are sent back from the server when the processing of the
+ * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
+ * timeouts during long running scan operations.
+ */
+ public boolean isHeartbeatMessage() {
+ return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
+ }
+
private int addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 16c3dbf..b0dac2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -489,6 +489,7 @@ public final class RequestConverter {
builder.setRegion(region);
builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true);
+ builder.setClientHandlesHeartbeats(true);
return builder.build();
}
@@ -507,6 +508,7 @@ public final class RequestConverter {
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true);
+ builder.setClientHandlesHeartbeats(true);
return builder.build();
}
@@ -527,6 +529,7 @@ public final class RequestConverter {
builder.setScannerId(scannerId);
builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true);
+ builder.setClientHandlesHeartbeats(true);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index bf14a53..f107fb7 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -905,6 +905,18 @@ possible configurations would overwhelm and obscure the important.
but will eventually throw a TimeoutException.</description>
</property>
<property>
+ <name>hbase.cells.scanned.per.heartbeat.check</name>
+ <value>10000</value>
+ <description>The number of cells scanned in between heartbeat checks. Heartbeat
+ checks occur during the processing of scans to determine whether or not the
+ server should stop scanning in order to send back a heartbeat message to the
+ client. Heartbeat messages are used to keep the client-server connection alive
+ during long running scans. Small values mean that the heartbeat checks will
+ occur more often and thus will provide a tighter bound on the execution time of
+ the scan. Larger values mean that the heartbeat checks occur less frequently
+ </description>
+ </property>
+ <property>
<name>hbase.rpc.shortoperation.timeout</name>
<value>10000</value>
<description>This is another version of "hbase.rpc.timeout". For those RPC operation
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 60ab651..2991ece 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -16433,6 +16433,16 @@ public final class ClientProtos {
* <code>optional bool client_handles_partials = 7;</code>
*/
boolean getClientHandlesPartials();
+
+ // optional bool client_handles_heartbeats = 8;
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ boolean hasClientHandlesHeartbeats();
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ boolean getClientHandlesHeartbeats();
}
/**
* Protobuf type {@code ScanRequest}
@@ -16549,6 +16559,11 @@ public final class ClientProtos {
clientHandlesPartials_ = input.readBool();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ clientHandlesHeartbeats_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -16713,6 +16728,22 @@ public final class ClientProtos {
return clientHandlesPartials_;
}
+ // optional bool client_handles_heartbeats = 8;
+ public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8;
+ private boolean clientHandlesHeartbeats_;
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public boolean hasClientHandlesHeartbeats() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public boolean getClientHandlesHeartbeats() {
+ return clientHandlesHeartbeats_;
+ }
+
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@@ -16721,6 +16752,7 @@ public final class ClientProtos {
closeScanner_ = false;
nextCallSeq_ = 0L;
clientHandlesPartials_ = false;
+ clientHandlesHeartbeats_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -16767,6 +16799,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeBool(7, clientHandlesPartials_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeBool(8, clientHandlesHeartbeats_);
+ }
getUnknownFields().writeTo(output);
}
@@ -16804,6 +16839,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(7, clientHandlesPartials_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(8, clientHandlesHeartbeats_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -16862,6 +16901,11 @@ public final class ClientProtos {
result = result && (getClientHandlesPartials()
== other.getClientHandlesPartials());
}
+ result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats());
+ if (hasClientHandlesHeartbeats()) {
+ result = result && (getClientHandlesHeartbeats()
+ == other.getClientHandlesHeartbeats());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -16903,6 +16947,10 @@ public final class ClientProtos {
hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getClientHandlesPartials());
}
+ if (hasClientHandlesHeartbeats()) {
+ hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -17049,6 +17097,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000020);
clientHandlesPartials_ = false;
bitField0_ = (bitField0_ & ~0x00000040);
+ clientHandlesHeartbeats_ = false;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@@ -17113,6 +17163,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000040;
}
result.clientHandlesPartials_ = clientHandlesPartials_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -17150,6 +17204,9 @@ public final class ClientProtos {
if (other.hasClientHandlesPartials()) {
setClientHandlesPartials(other.getClientHandlesPartials());
}
+ if (other.hasClientHandlesHeartbeats()) {
+ setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -17588,6 +17645,39 @@ public final class ClientProtos {
return this;
}
+ // optional bool client_handles_heartbeats = 8;
+ private boolean clientHandlesHeartbeats_ ;
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public boolean hasClientHandlesHeartbeats() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public boolean getClientHandlesHeartbeats() {
+ return clientHandlesHeartbeats_;
+ }
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public Builder setClientHandlesHeartbeats(boolean value) {
+ bitField0_ |= 0x00000080;
+ clientHandlesHeartbeats_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool client_handles_heartbeats = 8;</code>
+ */
+ public Builder clearClientHandlesHeartbeats() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ clientHandlesHeartbeats_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:ScanRequest)
}
@@ -17806,6 +17896,30 @@ public final class ClientProtos {
* </pre>
*/
boolean getMoreResultsInRegion();
+
+ // optional bool heartbeat_message = 9;
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ boolean hasHeartbeatMessage();
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ boolean getHeartbeatMessage();
}
/**
* Protobuf type {@code ScanResponse}
@@ -17939,6 +18053,11 @@ public final class ClientProtos {
moreResultsInRegion_ = input.readBool();
break;
}
+ case 72: {
+ bitField0_ |= 0x00000020;
+ heartbeatMessage_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18252,6 +18371,36 @@ public final class ClientProtos {
return moreResultsInRegion_;
}
+ // optional bool heartbeat_message = 9;
+ public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 9;
+ private boolean heartbeatMessage_;
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public boolean hasHeartbeatMessage() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public boolean getHeartbeatMessage() {
+ return heartbeatMessage_;
+ }
+
private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L;
@@ -18261,6 +18410,7 @@ public final class ClientProtos {
stale_ = false;
partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false;
+ heartbeatMessage_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -18298,6 +18448,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBool(8, moreResultsInRegion_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBool(9, heartbeatMessage_);
+ }
getUnknownFields().writeTo(output);
}
@@ -18346,6 +18499,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, moreResultsInRegion_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(9, heartbeatMessage_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -18400,6 +18557,11 @@ public final class ClientProtos {
result = result && (getMoreResultsInRegion()
== other.getMoreResultsInRegion());
}
+ result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage());
+ if (hasHeartbeatMessage()) {
+ result = result && (getHeartbeatMessage()
+ == other.getHeartbeatMessage());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -18445,6 +18607,10 @@ public final class ClientProtos {
hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getMoreResultsInRegion());
}
+ if (hasHeartbeatMessage()) {
+ hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -18581,6 +18747,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000040);
moreResultsInRegion_ = false;
bitField0_ = (bitField0_ & ~0x00000080);
+ heartbeatMessage_ = false;
+ bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
@@ -18648,6 +18816,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000010;
}
result.moreResultsInRegion_ = moreResultsInRegion_;
+ if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.heartbeatMessage_ = heartbeatMessage_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -18725,6 +18897,9 @@ public final class ClientProtos {
if (other.hasMoreResultsInRegion()) {
setMoreResultsInRegion(other.getMoreResultsInRegion());
}
+ if (other.hasHeartbeatMessage()) {
+ setHeartbeatMessage(other.getHeartbeatMessage());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -19561,6 +19736,67 @@ public final class ClientProtos {
return this;
}
+ // optional bool heartbeat_message = 9;
+ private boolean heartbeatMessage_ ;
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public boolean hasHeartbeatMessage() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public boolean getHeartbeatMessage() {
+ return heartbeatMessage_;
+ }
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public Builder setHeartbeatMessage(boolean value) {
+ bitField0_ |= 0x00000100;
+ heartbeatMessage_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool heartbeat_message = 9;</code>
+ *
+ * <pre>
+ * This field is filled in if the server is sending back a heartbeat message.
+ * Heartbeat messages are sent back to the client to prevent the scanner from
+ * timing out. Seeing a heartbeat message communicates to the Client that the
+ * server would have continued to scan had the time limit not been reached.
+ * </pre>
+ */
+ public Builder clearHeartbeatMessage() {
+ bitField0_ = (bitField0_ & ~0x00000100);
+ heartbeatMessage_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:ScanResponse)
}
@@ -32690,63 +32926,64 @@ public final class ClientProtos {
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
- "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
+ "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
"nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
"er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
"lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
- "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" +
- "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
- "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
- "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
- "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
- "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" +
- "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132",
- "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
- ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
- "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
- "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
- "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
- "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" +
- "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
- "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " +
- "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" +
- "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
- "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
- "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
- "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
- "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" +
- "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" +
- "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" +
- "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" +
- "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" +
- "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" +
- "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r",
- "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" +
- "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" +
- "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" +
- "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" +
- "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" +
- "adStats\"f\n\022RegionActionResult\022-\n\021resultO" +
- "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" +
- "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" +
- "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" +
- "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ",
- "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" +
- "onActionResult\030\001 \003(\0132\023.RegionActionResul" +
- "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" +
- "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" +
- " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" +
- "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" +
- "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" +
- "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" +
- "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" +
- "cessorServiceRequest\032\033.CoprocessorServic",
- "eResponse\022R\n\027ExecRegionServerService\022\032.C" +
- "oprocessorServiceRequest\032\033.CoprocessorSe" +
- "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." +
- "MultiResponseBB\n*org.apache.hadoop.hbase" +
- ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" +
- "\001"
+ "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
+ "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
+ "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
+ "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
+ "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
+ "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+ "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
+ "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
+ "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
+ "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" +
+ "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" +
+ "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" +
+ "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " +
+ "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" +
+ "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" +
+ "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" +
+ "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai",
+ "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" +
+ "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" +
+ ".CoprocessorServiceCall\"]\n\032CoprocessorSe" +
+ "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" +
+ "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" +
+ "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" +
+ "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" +
+ "service_call\030\004 \001(\0132\027.CoprocessorServiceC" +
+ "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" +
+ "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030",
+ "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" +
+ "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" +
+ "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" +
+ "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" +
+ "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" +
+ "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" +
+ "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" +
+ "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" +
+ ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." +
+ "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA",
+ "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" +
+ "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" +
+ "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" +
+ "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" +
+ "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" +
+ "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" +
+ "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" +
+ "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" +
+ "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" +
+ "oadHFileRequest\032\026.BulkLoadHFileResponse\022",
+ "F\n\013ExecService\022\032.CoprocessorServiceReque" +
+ "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" +
+ "egionServerService\022\032.CoprocessorServiceR" +
+ "equest\032\033.CoprocessorServiceResponse\022&\n\005M" +
+ "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
+ "rg.apache.hadoop.hbase.protobuf.generate" +
+ "dB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -32842,13 +33079,13 @@ public final class ClientProtos {
internal_static_ScanRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanRequest_descriptor,
- new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", });
+ new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", });
internal_static_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13);
internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor,
- new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", });
+ new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", });
internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index e0c370b..3a48cc8 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -275,6 +275,7 @@ message ScanRequest {
optional bool close_scanner = 5;
optional uint64 next_call_seq = 6;
optional bool client_handles_partials = 7;
+ optional bool client_handles_heartbeats = 8;
}
/**
@@ -313,6 +314,12 @@ message ScanResponse {
// reasons such as the size in bytes or quantity of results accumulated. This field
// will true when more results exist in the current region.
optional bool more_results_in_region = 8;
+
+ // This field is filled in if the server is sending back a heartbeat message.
+ // Heartbeat messages are sent back to the client to prevent the scanner from
+ // timing out. Seeing a heartbeat message communicates to the Client that the
+ // server would have continued to scan had the time limit not been reached.
+ optional bool heartbeat_message = 9;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 5809983..9d7bcc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.mortbay.log.Log;
@@ -72,8 +71,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
@Override
public Result next() throws IOException {
values.clear();
-
- scanner.nextRaw(values, NoLimitScannerContext.getInstance());
+ scanner.nextRaw(values);
if (values.isEmpty()) {
//we are done
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 96d5c77..c731923 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5288,8 +5288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
- throws IOException {
+ public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@@ -5327,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
-
+
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@@ -5395,6 +5394,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
+ } else if (scannerContext.checkTimeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
@@ -5443,6 +5446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress.
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getSizeProgress();
+ long initialTimeProgress = scannerContext.getTimeProgress();
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
@@ -5454,7 +5458,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress should be kept.
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
- scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+ scannerContext
+ .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
} else {
scannerContext.clearProgress();
}
@@ -5502,6 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
+ scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
}
// Check if we were getting data from the joinedHeap and hit the limit.
@@ -5537,6 +5543,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
return true;
}
+
Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
@@ -5550,12 +5557,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress
- // according to contents of results now.
+ // according to contents of results now. However, a change in the results should not
+ // affect the time progress. Thus preserve whatever time progress has been made
+ long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
- scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+ scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
+ initialTimeProgress);
} else {
scannerContext.clearProgress();
}
+ scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
@@ -5580,14 +5591,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
- Cell nextJoinedKv = joinedHeap.peek();
- // If joinedHeap is pointing to some other row, try to seek to a correct one.
- boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
- currentRow, offset, length))
- || (this.joinedHeap.requestSeek(
- KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
- && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
- currentRow, offset, length));
+ boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
@@ -5631,6 +5635,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
+ * @param currentRow
+ * @param offset
+ * @param length
+ * @return true when the joined heap may have data for the current row
+ * @throws IOException
+ */
+ private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
+ throws IOException {
+ Cell nextJoinedKv = joinedHeap.peek();
+ boolean matchCurrentRow =
+ nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
+ boolean matchAfterSeek = false;
+
+ // If the next value in the joined heap does not match the current row, try to seek to the
+ // correct row
+ if (!matchCurrentRow) {
+ Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
+ boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
+ matchAfterSeek =
+ seekSuccessful && joinedHeap.peek() != null
+ && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
+ }
+
+ return matchCurrentRow || matchAfterSeek;
+ }
+
+ /**
* This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
* both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
* not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 761267f..6433453 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -144,6 +144,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
InternalScanner currentAsInternal = (InternalScanner)this.current;
boolean moreCells = currentAsInternal.next(result, scannerContext);
Cell pee = this.current.peek();
+
/*
* By definition, any InternalScanner must return false only when it has no
* further rows to be fetched. So, we can close a scanner if it returns
@@ -151,6 +152,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
* more efficient to close scanners which are not needed than keep them in
* the heap. This is also required for certain optimizations.
*/
+
if (pee == null || !moreCells) {
this.current.close();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
index 1484e80..66ed6c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -68,7 +68,22 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
- void setProgress(int batchProgress, long sizeProgress) {
+ void setTimeProgress(long timeProgress) {
+ // Do nothing. NoLimitScannerContext instances are immutable post-construction
+ }
+
+ @Override
+ void updateTimeProgress() {
+ // Do nothing. NoLimitScannerContext instances are immutable post-construction
+ }
+
+ @Override
+ void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
+ // Do nothing. NoLimitScannerContext instances are immutable post-construction
+ }
+
+ @Override
+ void clearProgress() {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@@ -78,6 +93,11 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
+ void setTimeLimitScope(LimitScope scope) {
+ // Do nothing. NoLimitScannerContext instances are immutable post-construction
+ }
+
+ @Override
NextState setScannerState(NextState state) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
return state;
@@ -96,6 +116,12 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
+ boolean checkTimeLimit(LimitScope checkerScope) {
+ // No limits can be specified, thus return false to indicate no limit has been reached.
+ return false;
+ }
+
+ @Override
boolean checkAnyLimitReached(LimitScope checkerScope) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 38956cc..f322862 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -191,6 +191,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.region.server.rpc.scheduler.factory.class";
+ /**
+ * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
+ * configuration exists to prevent the scenario where a time limit is specified to be so
+ * restrictive that the time limit is reached immediately (before any cells are scanned).
+ */
+ private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
+ "hbase.region.server.rpc.minimum.scan.time.limit.delta";
+ /**
+ * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
+ */
+ private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
+
// Request counter. (Includes requests that are not serviced by regions.)
final Counter requestCount = new Counter();
// Server to handle client requests.
@@ -213,6 +225,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final int scannerLeaseTimeoutPeriod;
/**
+ * The RPC timeout period (milliseconds)
+ */
+ private final int rpcTimeout;
+
+ /**
+ * The minimum allowable delta to use for the scan limit
+ */
+ private final long minimumScanTimeLimitDelta;
+
+ /**
* Holder class which holds the RegionScanner and nextCallSeq together.
*/
private static class RegionScannerHolder {
@@ -829,6 +851,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
maxScannerResultSize = rs.conf.getLong(
HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
+ rpcTimeout = rs.conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ minimumScanTimeLimitDelta = rs.conf.getLong(
+ REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
+ DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
// Set our address, however we need the final port that was given to rpcServer
isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
@@ -2241,6 +2269,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
boolean clientHandlesPartials =
request.hasClientHandlesPartials() && request.getClientHandlesPartials();
+ boolean clientHandlesHeartbeats =
+ request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
// On the server side we must ensure that the correct ordering of partial results is
// returned to the client to allow them to properly reconstruct the partial results.
@@ -2252,23 +2282,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
boolean moreRows = false;
+ // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
+ // certain time threshold on the server. When the time threshold is exceeded, the
+ // server stops the scan and sends back whatever Results it has accumulated within
+ // that time period (may be empty). Since heartbeat messages have the potential to
+ // create partial Results (in the event that the timeout occurs in the middle of a
+ // row), we must only generate heartbeat messages when the client can handle both
+ // heartbeats AND partials
+ boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
+
+ // Default value of timeLimit is negative to indicate no timeLimit should be
+ // enforced.
+ long timeLimit = -1;
+
+ // Set the time limit to be half of the more restrictive timeout value (one of the
+ // timeout values must be positive). In the event that both values are positive, the
+ // more restrictive of the two is used to calculate the limit.
+ if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
+ long timeLimitDelta;
+ if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
+ timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
+ } else {
+ timeLimitDelta =
+ scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
+ }
+ // Use half of whichever timeout value was more restrictive... But don't allow
+ // the time limit to be less than the allowable minimum (could cause an
+ // immediatate timeout before scanning any data).
+ timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
+ timeLimit = System.currentTimeMillis() + timeLimitDelta;
+ }
+
final LimitScope sizeScope =
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+ final LimitScope timeScope =
+ allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
// Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
contextBuilder.setSizeLimit(sizeScope, maxResultSize);
contextBuilder.setBatchLimit(scanner.getBatch());
+ contextBuilder.setTimeLimit(timeScope, timeLimit);
ScannerContext scannerContext = contextBuilder.build();
+ boolean limitReached = false;
while (i < rows) {
- // Stop collecting results if we have exceeded maxResultSize
- if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) {
- builder.setMoreResultsInRegion(true);
- break;
- }
-
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
// batch limit is a limit on the number of cells per Result. Thus, if progress is
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@@ -2287,14 +2346,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
results.add(Result.create(values, null, stale, partial));
i++;
}
- if (!moreRows) {
+
+ boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
+ boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
+ boolean rowLimitReached = i >= rows;
+ limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
+
+ if (limitReached || !moreRows) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
+ + moreRows + " scannerContext: " + scannerContext);
+ }
+ // We only want to mark a ScanResponse as a heartbeat message in the event that
+ // there are more values to be read server side. If there aren't more values,
+ // marking it as a heartbeat is wasteful because the client will need to issue
+ // another ScanRequest only to realize that they already have all the values
+ if (moreRows) {
+ // Heartbeat messages occur when the time limit has been reached.
+ builder.setHeartbeatMessage(timeLimitReached);
+ }
break;
}
values.clear();
}
- if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows ||
- moreRows) {
+ if (limitReached || moreRows) {
// We stopped prematurely
builder.setMoreResultsInRegion(true);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 6e487ca..7c8ff7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -101,7 +101,7 @@ public class ScannerContext {
if (limitsToCopy != null) this.limits.copy(limitsToCopy);
// Progress fields are initialized to 0
- progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0);
+ progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@@ -137,6 +137,13 @@ public class ScannerContext {
progress.setSize(currentSize + size);
}
+ /**
+ * Update the time progress with {@link System#currentTimeMillis()}
+ */
+ void updateTimeProgress() {
+ progress.setTime(System.currentTimeMillis());
+ }
+
int getBatchProgress() {
return progress.getBatch();
}
@@ -145,9 +152,14 @@ public class ScannerContext {
return progress.getSize();
}
- void setProgress(int batchProgress, long sizeProgress) {
+ long getTimeProgress() {
+ return progress.getTime();
+ }
+
+ void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress);
+ setTimeProgress(timeProgress);
}
void setSizeProgress(long sizeProgress) {
@@ -158,12 +170,16 @@ public class ScannerContext {
progress.setBatch(batchProgress);
}
+ void setTimeProgress(long timeProgress) {
+ progress.setTime(timeProgress);
+ }
+
/**
* Clear away any progress that has been made so far. All progress fields are reset to initial
* values
*/
void clearProgress() {
- progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0);
+ progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
}
/**
@@ -172,7 +188,7 @@ public class ScannerContext {
* allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
* new state, thus preserving the immutability of {@link NoLimitScannerContext}
* @param state
- * @return The state that
+ * @return The state that was passed in.
*/
NextState setScannerState(NextState state) {
if (!NextState.isValidState(state)) {
@@ -188,7 +204,8 @@ public class ScannerContext {
* reached in the middle of a row.
*/
boolean partialResultFormed() {
- return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW;
+ return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
+ || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
}
/**
@@ -209,10 +226,18 @@ public class ScannerContext {
/**
* @param checkerScope
+ * @return true if the time limit can be enforced in the checker's scope
+ */
+ boolean hasTimeLimit(LimitScope checkerScope) {
+ return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
+ }
+
+ /**
+ * @param checkerScope
* @return true if any limit can be enforced within the checker's scope
*/
boolean hasAnyLimit(LimitScope checkerScope) {
- return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope);
+ return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
}
/**
@@ -222,6 +247,13 @@ public class ScannerContext {
limits.setSizeScope(scope);
}
+ /**
+ * @param scope The scope in which the time limit will be enforced
+ */
+ void setTimeLimitScope(LimitScope scope) {
+ limits.setTimeScope(scope);
+ }
+
int getBatchLimit() {
return limits.getBatch();
}
@@ -230,6 +262,10 @@ public class ScannerContext {
return limits.getSize();
}
+ long getTimeLimit() {
+ return limits.getTime();
+ }
+
/**
* @param checkerScope The scope that the limit is being checked from
* @return true when the limit is enforceable from the checker's scope and it has been reached
@@ -247,11 +283,21 @@ public class ScannerContext {
}
/**
+ * @param checkerScope The scope that the limit is being checked from. The time limit is always
+ * checked against {@link System#currentTimeMillis()}
+ * @return true when the limit is enforceable from the checker's scope and it has been reached
+ */
+ boolean checkTimeLimit(LimitScope checkerScope) {
+ return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
+ }
+
+ /**
* @param checkerScope The scope that the limits are being checked from
* @return true when some limit is enforceable from the checker's scope and it has been reached
*/
boolean checkAnyLimitReached(LimitScope checkerScope) {
- return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope);
+ return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
+ || checkTimeLimit(checkerScope);
}
@Override
@@ -305,6 +351,12 @@ public class ScannerContext {
return this;
}
+ public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
+ limits.setTime(timeLimit);
+ limits.setTimeScope(timeScope);
+ return this;
+ }
+
public Builder setBatchLimit(int batchLimit) {
limits.setBatch(batchLimit);
return this;
@@ -328,6 +380,13 @@ public class ScannerContext {
* of a row and thus a partial results was formed
*/
SIZE_LIMIT_REACHED_MID_ROW(true, true),
+ TIME_LIMIT_REACHED(true, true),
+
+ /**
+ * Special case of time limit reached to indicate that the time limit was reached in the middle
+ * of a row and thus a partial results was formed
+ */
+ TIME_LIMIT_REACHED_MID_ROW(true, true),
BATCH_LIMIT_REACHED(true, true);
private boolean moreValues;
@@ -419,6 +478,7 @@ public class ScannerContext {
*/
private static int DEFAULT_BATCH = -1;
private static long DEFAULT_SIZE = -1L;
+ private static long DEFAULT_TIME = -1L;
/**
* Default scope that is assigned to a limit if a scope is not specified.
@@ -432,19 +492,23 @@ public class ScannerContext {
LimitScope sizeScope = DEFAULT_SCOPE;
long size = DEFAULT_SIZE;
+ LimitScope timeScope = DEFAULT_SCOPE;
+ long time = DEFAULT_TIME;
+
/**
* Fields keep their default values.
*/
LimitFields() {
}
- LimitFields(int batch, LimitScope sizeScope, long size) {
- setFields(batch, sizeScope, size);
+ LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
+ setFields(batch, sizeScope, size, timeScope, time);
}
void copy(LimitFields limitsToCopy) {
if (limitsToCopy != null) {
- setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize());
+ setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
+ limitsToCopy.getTimeScope(), limitsToCopy.getTime());
}
}
@@ -454,10 +518,12 @@ public class ScannerContext {
* @param sizeScope
* @param size
*/
- void setFields(int batch, LimitScope sizeScope, long size) {
+ void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
setBatch(batch);
setSizeScope(sizeScope);
setSize(size);
+ setTimeScope(timeScope);
+ setTime(time);
}
int getBatch() {
@@ -506,6 +572,36 @@ public class ScannerContext {
return this.sizeScope.canEnforceLimitFromScope(checkerScope);
}
+ long getTime() {
+ return this.time;
+ }
+
+ void setTime(long time) {
+ this.time = time;
+ }
+
+ /**
+ * @return {@link LimitScope} indicating scope in which the time limit is enforced
+ */
+ LimitScope getTimeScope() {
+ return this.timeScope;
+ }
+
+ /**
+ * Change the scope in which the time limit is enforced
+ */
+ void setTimeScope(LimitScope scope) {
+ this.timeScope = scope;
+ }
+
+ /**
+ * @param checkerScope
+ * @return true when the limit can be enforced from the scope of the checker
+ */
+ boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
+ return this.sizeScope.canEnforceLimitFromScope(checkerScope);
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -520,6 +616,12 @@ public class ScannerContext {
sb.append(", sizeScope:");
sb.append(sizeScope);
+ sb.append(", time:");
+ sb.append(time);
+
+ sb.append(", timeScope:");
+ sb.append(timeScope);
+
sb.append("}");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 665ed46..f7e06ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -83,6 +83,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long now;
protected final int minVersions;
protected final long maxRowSize;
+ protected final long cellsPerHeartbeatCheck;
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@@ -100,6 +101,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected static boolean lazySeekEnabledGlobally =
LAZY_SEEK_ENABLED_BY_DEFAULT;
+ /**
+ * The number of cells scanned in between timeout checks. Specifying a larger value means that
+ * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
+ * timeout checks.
+ */
+ public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
+ "hbase.cells.scanned.per.heartbeat.check";
+
+ /**
+ * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
+ */
+ public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
+
// if heap == null and lastTop != null, you need to reseek given the key below
protected Cell lastTop = null;
@@ -137,9 +151,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.maxRowSize =
conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+
+ long tmpCellsPerTimeoutCheck =
+ conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
+ DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
+ this.cellsPerHeartbeatCheck =
+ tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
+ : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
} else {
this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
this.scanUsePread = scan.isSmall();
+ this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
}
// We look up row-column Bloom filters for multi-column queries as part of
@@ -458,6 +480,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
lock.lock();
+
try {
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
@@ -507,6 +530,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
long totalBytesRead = 0;
LOOP: while((cell = this.heap.peek()) != null) {
+ // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
+ if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
+ scannerContext.updateTimeProgress();
+ if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+ return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
+ }
+ }
+
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevCell, cell, comparator);
prevCell = cell;
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index eef955e..3794e59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide {
count++;
}
- assertTrue(scanner2.next() == null);
+ r2 = scanner2.next();
+ assertTrue("r2: " + r2 + " Should be null", r2 == null);
scanner1.close();
scanner2.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index a8b5456..82fa3c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -144,7 +144,6 @@ public class TestCoprocessorInterface {
public int getBatch() {
return delegate.getBatch();
}
-
}
public static class CoprocessorImpl extends BaseRegionObserver {