You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/30 23:47:13 UTC
hbase git commit: HBASE-12761 On region jump ClientScanners should
get next row start key instead of a skip.
Repository: hbase
Updated Branches:
refs/heads/master 305267b8e -> e5ec14af0
HBASE-12761 On region jump ClientScanners should get next row start key instead of a skip.
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5ec14af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5ec14af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5ec14af
Branch: refs/heads/master
Commit: e5ec14af04069faf1cc4c671e3c721333f7415b2
Parents: 305267b
Author: Jurriaan Mous <ju...@jurmo.us>
Authored: Sun Dec 28 13:53:03 2014 +0100
Committer: stack <st...@apache.org>
Committed: Tue Dec 30 14:47:01 2014 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 66 ++++++--------------
.../hbase/client/ReversedClientScanner.java | 5 +-
.../client/ScannerCallableWithReplicas.java | 11 +++-
3 files changed, 33 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec14af/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 3f1ba84..afc9bc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
+
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@@ -91,7 +93,8 @@ public class ClientScanner extends AbstractClientScanner {
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
- RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException {
+ RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
+ throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Scan table=" + tableName
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -228,7 +231,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
- call(scan, callable, caller, scannerTimeout);
+ call(callable, caller, scannerTimeout);
this.callable = null;
}
@@ -265,7 +268,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
- call(scan, callable, caller, scannerTimeout);
+ call(callable, caller, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@@ -282,7 +285,7 @@ public class ClientScanner extends AbstractClientScanner {
return callable.isAnyRPCcancelled();
}
- static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
+ static Result[] call(ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {
if (Thread.interrupted()) {
@@ -309,12 +312,12 @@ public class ClientScanner extends AbstractClientScanner {
/**
* Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
- * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
- * framework because it doesn't support multi-instances of the same metrics on the same machine;
- * for scan/map reduce scenarios, we will have multiple scans running at the same time.
+ * application or TableInputFormat.Later, we could push it to other systems. We don't use
+ * metrics framework because it doesn't support multi-instances of the same metrics on the same
+ * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
*
- * By default, scan metrics are disabled; if the application wants to collect them, this behavior
- * can be turned on by calling calling:
+ * By default, scan metrics are disabled; if the application wants to collect them, this
+ * behavior can be turned on by calling calling:
*
* scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
*/
@@ -342,39 +345,13 @@ public class ClientScanner extends AbstractClientScanner {
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
- boolean skipFirst = false;
boolean retryAfterOutOfOrderException = true;
do {
try {
- if (skipFirst) {
- // Skip only the first row (which was the last row of the last
- // already-processed batch).
- callable.setCaching(1);
- values = call(scan, callable, caller, scannerTimeout);
- // When the replica switch happens, we need to do certain operations
- // again. The scannercallable will openScanner with the right startkey
- // but we need to pick up from there. Bypass the rest of the loop
- // and let the catch-up happen in the beginning of the loop as it
- // happens for the cases where we see exceptions. Since only openScanner
- // would have happened, values would be null
- if (values == null && callable.switchedToADifferentReplica()) {
- if (this.lastResult != null) { //only skip if there was something read earlier
- skipFirst = true;
- }
- this.currentRegion = callable.getHRegionInfo();
- continue;
- }
- callable.setCaching(this.caching);
- skipFirst = false;
- }
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
- values = call(scan, callable, caller, scannerTimeout);
- if (skipFirst && values != null && values.length == 1) {
- skipFirst = false; // Already skipped, unset it before scanning again
- values = call(scan, callable, caller, scannerTimeout);
- }
+ values = call(callable, caller, scannerTimeout);
// When the replica switch happens, we need to do certain operations
// again. The callable will openScanner with the right startkey
// but we need to pick up from there. Bypass the rest of the loop
@@ -382,9 +359,6 @@ public class ClientScanner extends AbstractClientScanner {
// happens for the cases where we see exceptions. Since only openScanner
// would have happened, values would be null
if (values == null && callable.switchedToADifferentReplica()) {
- if (this.lastResult != null) { //only skip if there was something read earlier
- skipFirst = true;
- }
this.currentRegion = callable.getHRegionInfo();
continue;
}
@@ -427,11 +401,11 @@ public class ClientScanner extends AbstractClientScanner {
// scanner starts at the correct row. Otherwise we may see previously
// returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
- this.scan.setStartRow(this.lastResult.getRow());
-
- // Skip first row returned. We already let it out on previous
- // invocation.
- skipFirst = true;
+ if(scan.isReversed()){
+ scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+ }else {
+ scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+ }
}
if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException) {
@@ -451,7 +425,7 @@ public class ClientScanner extends AbstractClientScanner {
continue;
}
long currentTime = System.currentTimeMillis();
- if (this.scanMetrics != null ) {
+ if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
}
lastNext = currentTime;
@@ -486,7 +460,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
- call(scan, callable, caller, scannerTimeout);
+ call(callable, caller, scannerTimeout);
} catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec14af/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index b36cf7f..0f244e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -57,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner {
TableName tableName, ClusterConnection connection,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException {
- super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
+ super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
+ primaryOperationTimeout);
}
@Override
@@ -166,7 +167,7 @@ public class ReversedClientScanner extends ClientScanner {
* @param row
* @return a new byte array which is the closest front row of the specified one
*/
- protected byte[] createClosestRowBefore(byte[] row) {
+ protected static byte[] createClosestRowBefore(byte[] row) {
if (row == null) {
throw new IllegalArgumentException("The passed row is empty");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec14af/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 4209987..92293f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -39,9 +39,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
+
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@@ -272,8 +276,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
continue; //this was already scheduled earlier
}
ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
+
if (this.lastResult != null) {
- s.getScan().setStartRow(this.lastResult.getRow());
+ if(s.getScan().isReversed()){
+ s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
+ }else {
+ s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+ }
}
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);