You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/10/26 09:27:20 UTC
[1/2] hbase git commit: Revert "Implement small scan" due to miss
issue number
Repository: hbase
Updated Branches:
refs/heads/master 1eae9aeea -> cd3dd6e01
Revert "Implement small scan" due to miss issue number
This reverts commit c7c45f2c85cddd860a293fe9364b2b7ab0ab5bba.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5cee6a39
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5cee6a39
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5cee6a39
Branch: refs/heads/master
Commit: 5cee6a39c21966bd82f5778f55295559cd663a31
Parents: 1eae9ae
Author: zhangduo <zh...@apache.org>
Authored: Wed Oct 26 17:22:50 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Oct 26 17:22:50 2016 +0800
----------------------------------------------------------------------
.../client/AsyncConnectionConfiguration.java | 41 +---
.../hadoop/hbase/client/AsyncRegionLocator.java | 23 --
.../client/AsyncRpcRetryingCallerFactory.java | 83 +------
.../AsyncSingleRequestRpcRetryingCaller.java | 15 +-
.../client/AsyncSmallScanRpcRetryingCaller.java | 211 ------------------
.../apache/hadoop/hbase/client/AsyncTable.java | 23 --
.../hadoop/hbase/client/AsyncTableImpl.java | 47 +---
.../hadoop/hbase/client/ClientScanner.java | 176 ++++++++-------
.../client/ClientSmallReversedScanner.java | 6 +-
.../hadoop/hbase/client/ConnectionUtils.java | 46 +---
.../hbase/client/ReversedClientScanner.java | 2 -
.../client/ScannerCallableWithReplicas.java | 12 +-
.../hbase/client/TestAsyncTableSmallScan.java | 219 -------------------
.../hadoop/hbase/client/TestFromClientSide.java | 3 +-
14 files changed, 126 insertions(+), 781 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index aaac845..ba2e660 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -20,18 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
-import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
@@ -41,7 +34,6 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@@ -67,13 +59,6 @@ class AsyncConnectionConfiguration {
/** How many retries are allowed before we start to log */
private final int startLogErrorsCnt;
- private final long scanTimeoutNs;
-
- private final int scannerCaching;
-
- private final long scannerMaxResultSize;
-
- @SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -83,18 +68,11 @@ class AsyncConnectionConfiguration {
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
- this.pauseNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+ this.pauseNs = TimeUnit.MILLISECONDS
+ .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.startLogErrorsCnt =
- conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
- this.scanTimeoutNs = TimeUnit.MILLISECONDS
- .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
- this.scannerCaching =
- conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
- this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
- DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+ this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
+ DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
}
long getMetaOperationTimeoutNs() {
@@ -125,15 +103,4 @@ class AsyncConnectionConfiguration {
return startLogErrorsCnt;
}
- public long getScanTimeoutNs() {
- return scanTimeoutNs;
- }
-
- public int getScannerCaching() {
- return scannerCaching;
- }
-
- public long getScannerMaxResultSize() {
- return scannerMaxResultSize;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 321fd71..dc75ba6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -29,7 +27,6 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
/**
* TODO: reimplement using aync connection when the scan logic is ready. The current implementation
@@ -55,26 +52,6 @@ class AsyncRegionLocator implements Closeable {
return future;
}
- CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
- byte[] startRowOfCurrentRegion, boolean reload) {
- CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
- byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
- try {
- for (;;) {
- HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
- byte[] endKey = loc.getRegionInfo().getEndKey();
- if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
- future.complete(loc);
- break;
- }
- toLocateRow = endKey;
- }
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
ServerName source) {
conn.updateCachedLocations(tableName, regionName, row, exception, source);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 9020ce5..c5ac9a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,12 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Preconditions;
import io.netty.util.HashedWheelTimer;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -56,8 +54,6 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
- private boolean locateToPreviousRegion;
-
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@@ -68,8 +64,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public SingleRequestCallerBuilder<T>
- action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+ public SingleRequestCallerBuilder<T> action(
+ AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@@ -84,18 +80,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public SingleRequestCallerBuilder<T> locateToPreviousRegion(boolean locateToPreviousRegion) {
- this.locateToPreviousRegion = locateToPreviousRegion;
- return this;
- }
-
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
- checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
- locateToPreviousRegion
- ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
- : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
- checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
+ Preconditions.checkNotNull(tableName, "tableName is null"),
+ Preconditions.checkNotNull(row, "row is null"),
+ Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
conn.connConf.getStartLogErrorsCnt());
}
@@ -114,64 +103,4 @@ class AsyncRpcRetryingCallerFactory {
public <T> SingleRequestCallerBuilder<T> single() {
return new SingleRequestCallerBuilder<>();
}
-
- public class SmallScanCallerBuilder {
-
- private TableName tableName;
-
- private Scan scan;
-
- private int limit;
-
- private long scanTimeoutNs = -1L;
-
- private long rpcTimeoutNs = -1L;
-
- public SmallScanCallerBuilder table(TableName tableName) {
- this.tableName = tableName;
- return this;
- }
-
- public SmallScanCallerBuilder setScan(Scan scan) {
- this.scan = scan;
- return this;
- }
-
- public SmallScanCallerBuilder limit(int limit) {
- this.limit = limit;
- return this;
- }
-
- public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
- this.scanTimeoutNs = unit.toNanos(scanTimeout);
- return this;
- }
-
- public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
- this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
- return this;
- }
-
- public AsyncSmallScanRpcRetryingCaller build() {
- TableName tableName = checkNotNull(this.tableName, "tableName is null");
- Scan scan = checkNotNull(this.scan, "scan is null");
- checkArgument(limit > 0, "invalid limit %d", limit);
- return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
- rpcTimeoutNs);
- }
-
- /**
- * Shortcut for {@code build().call()}
- */
- public CompletableFuture<List<Result>> call() {
- return build().call();
- }
- }
-
- /**
- * Create retry caller for small scan.
- */
- public SmallScanCallerBuilder smallScan() {
- return new SmallScanCallerBuilder();
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 1d0357d..8acde94 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
- @FunctionalInterface
- public interface RegionLocator {
- CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
- byte[] row, boolean reload);
- }
-
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
@@ -74,8 +68,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final byte[] row;
- private final RegionLocator locator;
-
private final Callable<T> callable;
private final long pauseNs;
@@ -97,13 +89,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
- int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int maxRetries,
+ long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.row = row;
- this.locator = locator;
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries);
@@ -216,7 +207,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}
private void locateThenCall() {
- locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
+ conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
deleted file mode 100644
index af639c0..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Retry caller for smaller scan.
- */
-@InterfaceAudience.Private
-class AsyncSmallScanRpcRetryingCaller {
-
- private final AsyncConnectionImpl conn;
-
- private final TableName tableName;
-
- private final Scan scan;
-
- private final int limit;
-
- private final long scanTimeoutNs;
-
- private final long rpcTimeoutNs;
-
- private final Function<byte[], byte[]> createClosestNextRow;
-
- private final Runnable firstScan;
-
- private final Function<HRegionInfo, Boolean> nextScan;
-
- private final List<Result> resultList;
-
- private final CompletableFuture<List<Result>> future;
-
- public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
- int limit, long scanTimeoutNs, long rpcTimeoutNs) {
- this.conn = conn;
- this.tableName = tableName;
- this.scan = scan;
- this.limit = limit;
- this.scanTimeoutNs = scanTimeoutNs;
- this.rpcTimeoutNs = rpcTimeoutNs;
- if (scan.isReversed()) {
- this.createClosestNextRow = ConnectionUtils::createClosestRowBefore;
- this.firstScan = this::reversedFirstScan;
- this.nextScan = this::reversedNextScan;
- } else {
- this.createClosestNextRow = ConnectionUtils::createClosestRowAfter;
- this.firstScan = this::firstScan;
- this.nextScan = this::nextScan;
- }
- this.resultList = new ArrayList<>();
- this.future = new CompletableFuture<>();
- }
-
- private static final class SmallScanResponse {
-
- public final Result[] results;
-
- public final HRegionInfo currentRegion;
-
- public final boolean hasMoreResultsInRegion;
-
- public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
- boolean hasMoreResultsInRegion) {
- this.results = results;
- this.currentRegion = currentRegion;
- this.hasMoreResultsInRegion = hasMoreResultsInRegion;
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "Findbugs seems to be confused by lambda expression.")
- private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
- HRegionLocation loc, ClientService.Interface stub) {
- CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
- ScanRequest req;
- try {
- req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
- limit - resultList.size(), true);
- } catch (IOException e) {
- future.completeExceptionally(e);
- return future;
- }
- stub.scan(controller, req, resp -> {
- if (controller.failed()) {
- future.completeExceptionally(controller.getFailed());
- } else {
- try {
- Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
- future.complete(
- new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
- });
- return future;
- }
-
- private void onComplete(SmallScanResponse resp) {
- resultList.addAll(Arrays.asList(resp.results));
- if (resultList.size() == limit) {
- future.complete(resultList);
- return;
- }
- if (resp.hasMoreResultsInRegion) {
- if (resp.results.length > 0) {
- scan.setStartRow(
- createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
- }
- scan(false);
- return;
- }
- if (!nextScan.apply(resp.currentRegion)) {
- future.complete(resultList);
- }
- }
-
- private void scan(boolean locateToPreviousRegion) {
- conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
- .locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call()
- .whenComplete((resp, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else {
- onComplete(resp);
- }
- });
- }
-
- public CompletableFuture<List<Result>> call() {
- firstScan.run();
- return future;
- }
-
- private void firstScan() {
- scan(false);
- }
-
- private void reversedFirstScan() {
- scan(isEmptyStartRow(scan.getStartRow()));
- }
-
- private boolean nextScan(HRegionInfo region) {
- if (isEmptyStopRow(scan.getStopRow())) {
- if (isEmptyStopRow(region.getEndKey())) {
- return false;
- }
- } else {
- if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) {
- return false;
- }
- }
- scan.setStartRow(region.getEndKey());
- scan(false);
- return true;
- }
-
- private boolean reversedNextScan(HRegionInfo region) {
- if (isEmptyStopRow(scan.getStopRow())) {
- if (isEmptyStartRow(region.getStartKey())) {
- return false;
- }
- } else {
- if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) {
- return false;
- }
- }
- scan.setStartRow(region.getStartKey());
- scan(true);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 94747b9..4642746 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -313,26 +312,4 @@ public interface AsyncTable {
*/
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation);
-
- /**
- * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
- * @see #smallScan(Scan, int)
- */
- default CompletableFuture<List<Result>> smallScan(Scan scan) {
- return smallScan(scan, Integer.MAX_VALUE);
- }
-
- /**
- * Return all the results that match the given scan object. The number of the returned results
- * will not be greater than {@code limit}.
- * <p>
- * Notice that the scan must be small, and should not use batch or allowPartialResults. The
- * {@code caching} property of the scan object is also ignored as we will use {@code limit}
- * instead.
- * @param scan A configured {@link Scan} object.
- * @param limit the limit of results count
- * @return The results of this small scan operation. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index ce53775..77a5bbe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -47,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The implementation of AsyncTable.
@@ -59,18 +57,12 @@ class AsyncTableImpl implements AsyncTable {
private final TableName tableName;
- private final int defaultScannerCaching;
-
- private final long defaultScannerMaxResultSize;
-
private long readRpcTimeoutNs;
private long writeRpcTimeoutNs;
private long operationTimeoutNs;
- private long scanTimeoutNs;
-
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
@@ -78,9 +70,6 @@ class AsyncTableImpl implements AsyncTable {
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
: conn.connConf.getOperationTimeoutNs();
- this.defaultScannerCaching = conn.connConf.getScannerCaching();
- this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
- this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
}
@Override
@@ -267,8 +256,8 @@ class AsyncTableImpl implements AsyncTable {
future.completeExceptionally(controller.getFailed());
} else {
try {
- org.apache.hadoop.hbase.client.MultiResponse multiResp =
- ResponseConverter.getResults(req, resp, controller.cellScanner());
+ org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter
+ .getResults(req, resp, controller.cellScanner());
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future
@@ -316,38 +305,6 @@ class AsyncTableImpl implements AsyncTable {
.call();
}
- private <T> CompletableFuture<T> failedFuture(Throwable error) {
- CompletableFuture<T> future = new CompletableFuture<>();
- future.completeExceptionally(error);
- return future;
- }
-
- private Scan setDefaultScanConfig(Scan scan) {
- // always create a new scan object as we may reset the start row later.
- Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
- if (newScan.getCaching() <= 0) {
- newScan.setCaching(defaultScannerCaching);
- }
- if (newScan.getMaxResultSize() <= 0) {
- newScan.setMaxResultSize(defaultScannerMaxResultSize);
- }
- return newScan;
- }
-
- @Override
- public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
- if (!scan.isSmall()) {
- return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
- }
- if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
- return failedFuture(
- new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
- }
- return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
- .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
- }
-
@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 00ff350..371a68a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,20 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
-
import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,11 +35,20 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@@ -60,51 +56,53 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public abstract class ClientScanner extends AbstractClientScanner {
- private static final Log LOG = LogFactory.getLog(ClientScanner.class);
-
- protected Scan scan;
- protected boolean closed = false;
- // Current region scanner is against. Gets cleared if current region goes
- // wonky: e.g. if it splits on us.
- protected HRegionInfo currentRegion = null;
- protected ScannerCallableWithReplicas callable = null;
- protected Queue<Result> cache;
- /**
- * A list of partial results that have been returned from the server. This list should only
- * contain results if this scanner does not have enough partial results to form the complete
- * result.
- */
- protected final LinkedList<Result> partialResults = new LinkedList<Result>();
- /**
- * The row for which we are accumulating partial Results (i.e. the row of the Results stored
- * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
- * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
- */
- protected byte[] partialResultsRow = null;
- /**
- * The last cell from a not full Row which is added to cache
- */
- protected Cell lastCellLoadedToCache = null;
- protected final int caching;
- protected long lastNext;
- // Keep lastResult returned successfully in case we have to reset scanner.
- protected Result lastResult = null;
- protected final long maxScannerResultSize;
- private final ClusterConnection connection;
- private final TableName tableName;
- protected final int scannerTimeout;
- protected boolean scanMetricsPublished = false;
- protected RpcRetryingCaller<Result[]> caller;
- protected RpcControllerFactory rpcControllerFactory;
- protected Configuration conf;
- // The timeout on the primary. Applicable if there are multiple replicas for a region
- // In that case, we will only wait for this much timeout on the primary before going
- // to the replicas and trying the same scan. Note that the retries will still happen
- // on each replica and the first successful results will be taken. A timeout of 0 is
- // disallowed.
- protected final int primaryOperationTimeout;
- private int retries;
- protected final ExecutorService pool;
+ private static final Log LOG = LogFactory.getLog(ClientScanner.class);
+ // A byte array in which all elements are the max byte, and it is used to
+ // construct closest front row
+ static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+ protected Scan scan;
+ protected boolean closed = false;
+ // Current region scanner is against. Gets cleared if current region goes
+ // wonky: e.g. if it splits on us.
+ protected HRegionInfo currentRegion = null;
+ protected ScannerCallableWithReplicas callable = null;
+ protected Queue<Result> cache;
+ /**
+ * A list of partial results that have been returned from the server. This list should only
+ * contain results if this scanner does not have enough partial results to form the complete
+ * result.
+ */
+ protected final LinkedList<Result> partialResults = new LinkedList<Result>();
+ /**
+ * The row for which we are accumulating partial Results (i.e. the row of the Results stored
+ * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
+ * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
+ */
+ protected byte[] partialResultsRow = null;
+ /**
+ * The last cell from a not full Row which is added to cache
+ */
+ protected Cell lastCellLoadedToCache = null;
+ protected final int caching;
+ protected long lastNext;
+ // Keep lastResult returned successfully in case we have to reset scanner.
+ protected Result lastResult = null;
+ protected final long maxScannerResultSize;
+ private final ClusterConnection connection;
+ private final TableName tableName;
+ protected final int scannerTimeout;
+ protected boolean scanMetricsPublished = false;
+ protected RpcRetryingCaller<Result []> caller;
+ protected RpcControllerFactory rpcControllerFactory;
+ protected Configuration conf;
+ //The timeout on the primary. Applicable if there are multiple replicas for a region
+ //In that case, we will only wait for this much timeout on the primary before going
+ //to the replicas and trying the same scan. Note that the retries will still happen
+ //on each replica and the first successful results will be taken. A timeout of 0 is
+ //disallowed.
+ protected final int primaryOperationTimeout;
+ private int retries;
+ protected final ExecutorService pool;
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@@ -449,7 +447,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
- scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
+ scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
}
} else {
// we need rescan this row because we only loaded partial row before
@@ -739,27 +737,49 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
}
- @Override
- public void close() {
- if (!scanMetricsPublished) writeScanMetrics();
- if (callable != null) {
- callable.setClose();
- try {
- call(callable, caller, scannerTimeout);
- } catch (UnknownScannerException e) {
- // We used to catch this error, interpret, and rethrow. However, we
- // have since decided that it's not nice for a scanner's close to
- // throw exceptions. Chances are it was just due to lease time out.
- if (LOG.isDebugEnabled()) {
- LOG.debug("scanner failed to close", e);
+ @Override
+ public void close() {
+ if (!scanMetricsPublished) writeScanMetrics();
+ if (callable != null) {
+ callable.setClose();
+ try {
+ call(callable, caller, scannerTimeout);
+ } catch (UnknownScannerException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just due to lease time out.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("scanner failed to close", e);
+ }
+ } catch (IOException e) {
+ /* An exception other than UnknownScanner is unexpected. */
+ LOG.warn("scanner failed to close.", e);
}
- } catch (IOException e) {
- /* An exception other than UnknownScanner is unexpected. */
- LOG.warn("scanner failed to close.", e);
+ callable = null;
}
- callable = null;
+ closed = true;
+ }
+
+ /**
+ * Create the closest row before the specified row
+ * @param row
+ * @return a new byte array which is the closest front row of the specified one
+ */
+ protected static byte[] createClosestRowBefore(byte[] row) {
+ if (row == null) {
+ throw new IllegalArgumentException("The passed row is empty");
+ }
+ if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
+ return MAX_BYTE_ARRAY;
+ }
+ if (row[row.length - 1] == 0) {
+ return Arrays.copyOf(row, row.length - 1);
+ } else {
+ byte[] closestFrontRow = Arrays.copyOf(row, row.length);
+ closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
+ closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
+ return closestFrontRow;
}
- closed = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 971a2ca..5fac93a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -16,11 +16,9 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+package org.apache.hadoop.hbase.client;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@@ -38,6 +36,8 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* <p>
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index e0030e8..eca9ad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,16 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -37,11 +33,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
/**
* Utility used by client connections.
@@ -227,41 +222,4 @@ public final class ConnectionUtils {
return HConstants.NO_NONCE;
}
};
-
- // A byte array in which all elements are the max byte, and it is used to
- // construct closest front row
- static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
-
- /**
- * Create the closest row after the specified row
- */
- static byte[] createClosestRowAfter(byte[] row) {
- return Arrays.copyOf(row, row.length + 1);
- }
-
- /**
- * Create the closest row before the specified row
- */
- static byte[] createClosestRowBefore(byte[] row) {
- if (row.length == 0) {
- return MAX_BYTE_ARRAY;
- }
- if (row[row.length - 1] == 0) {
- return Arrays.copyOf(row, row.length - 1);
- } else {
- byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
- System.arraycopy(row, 0, nextRow, 0, row.length - 1);
- nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
- System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length);
- return nextRow;
- }
- }
-
- static boolean isEmptyStartRow(byte[] row) {
- return Bytes.equals(row, EMPTY_START_ROW);
- }
-
- static boolean isEmptyStopRow(byte[] row) {
- return Bytes.equals(row, EMPTY_END_ROW);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 390e236..dde82ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
-
import java.io.IOException;
import java.util.concurrent.ExecutorService;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index e04fd6e..5174598 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
-
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -33,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -42,8 +40,12 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@@ -339,7 +341,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (callable.getScan().isReversed()) {
callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
} else {
- callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
+ callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
deleted file mode 100644
index 972780e..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableSmallScan {
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
- private static int COUNT = 1000;
-
- private static AsyncConnection ASYNC_CONN;
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_UTIL.startMiniCluster(3);
- byte[][] splitKeys = new byte[8][];
- for (int i = 111; i < 999; i += 111) {
- splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
- }
- TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
- TEST_UTIL.waitTableAvailable(TABLE_NAME);
- ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- List<CompletableFuture<?>> futures = new ArrayList<>();
- IntStream.range(0, COUNT)
- .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
- .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- ASYNC_CONN.close();
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Test
- public void testScanAll() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
- assertEquals(COUNT, results.size());
- IntStream.range(0, COUNT).forEach(i -> {
- Result result = results.get(i);
- assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
- assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- @Test
- public void testReversedScanAll() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
- assertEquals(COUNT, results.size());
- IntStream.range(0, COUNT).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = COUNT - i - 1;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- @Test
- public void testScanNoStopKey() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- int start = 345;
- List<Result> results = table
- .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
- assertEquals(COUNT - start, results.size());
- IntStream.range(0, COUNT - start).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start + i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- @Test
- public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- int start = 765;
- List<Result> results = table
- .smallScan(
- new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
- .get();
- assertEquals(start + 1, results.size());
- IntStream.range(0, start + 1).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start - i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
- assertEquals(stop - start, results.size());
- IntStream.range(0, stop - start).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start + i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- private void testReversedScan(int start, int stop)
- throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
- .get();
- assertEquals(start - stop, results.size());
- IntStream.range(0, start - stop).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start - i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- @Test
- public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
- testScan(345, 567);
- }
-
- @Test
- public void testReversedScanWithStartKeyAndStopKey()
- throws InterruptedException, ExecutionException {
- testReversedScan(765, 543);
- }
-
- @Test
- public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
- testScan(222, 333);
- }
-
- @Test
- public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
- testScan(222, 333);
- }
-
- @Test
- public void testScanWithLimit() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- int start = 111;
- int stop = 888;
- int limit = 300;
- List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
- limit).get();
- assertEquals(limit, results.size());
- IntStream.range(0, limit).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start + i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-
- @Test
- public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
- AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
- int start = 888;
- int stop = 111;
- int limit = 300;
- List<Result> results = table.smallScan(
- new Scan(Bytes.toBytes(String.format("%03d", start)))
- .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
- limit).get();
- assertEquals(limit, results.size());
- IntStream.range(0, limit).forEach(i -> {
- Result result = results.get(i);
- int actualIndex = start - i;
- assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
- assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cee6a39/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index ae93e67..89841a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -5934,7 +5933,7 @@ public class TestFromClientSide {
public void testReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions");
- byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
+ byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY;
byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
Bytes.toBytes("006"),
[2/2] hbase git commit: HBASE-16932 Implement small scan
Posted by zh...@apache.org.
HBASE-16932 Implement small scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd3dd6e0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd3dd6e0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd3dd6e0
Branch: refs/heads/master
Commit: cd3dd6e018513357d2cf0b5bba073f5a6551f7a4
Parents: 5cee6a3
Author: zhangduo <zh...@apache.org>
Authored: Wed Oct 26 17:21:35 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Oct 26 17:26:58 2016 +0800
----------------------------------------------------------------------
.../client/AsyncConnectionConfiguration.java | 41 +++-
.../hadoop/hbase/client/AsyncRegionLocator.java | 23 ++
.../client/AsyncRpcRetryingCallerFactory.java | 83 ++++++-
.../AsyncSingleRequestRpcRetryingCaller.java | 15 +-
.../client/AsyncSmallScanRpcRetryingCaller.java | 211 ++++++++++++++++++
.../apache/hadoop/hbase/client/AsyncTable.java | 23 ++
.../hadoop/hbase/client/AsyncTableImpl.java | 47 +++-
.../hadoop/hbase/client/ClientScanner.java | 176 +++++++--------
.../client/ClientSmallReversedScanner.java | 6 +-
.../hadoop/hbase/client/ConnectionUtils.java | 46 +++-
.../hbase/client/ReversedClientScanner.java | 2 +
.../client/ScannerCallableWithReplicas.java | 12 +-
.../hbase/client/TestAsyncTableSmallScan.java | 219 +++++++++++++++++++
.../hadoop/hbase/client/TestFromClientSide.java | 3 +-
14 files changed, 781 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index ba2e660..aaac845 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -20,11 +20,18 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
@@ -34,6 +41,7 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@@ -59,6 +67,13 @@ class AsyncConnectionConfiguration {
/** How many retries are allowed before we start to log */
private final int startLogErrorsCnt;
+ private final long scanTimeoutNs;
+
+ private final int scannerCaching;
+
+ private final long scannerMaxResultSize;
+
+ @SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -68,11 +83,18 @@ class AsyncConnectionConfiguration {
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
- this.pauseNs = TimeUnit.MILLISECONDS
- .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+ this.pauseNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
- DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+ this.startLogErrorsCnt =
+ conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+ this.scanTimeoutNs = TimeUnit.MILLISECONDS
+ .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+ this.scannerCaching =
+ conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+ DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
long getMetaOperationTimeoutNs() {
@@ -103,4 +125,15 @@ class AsyncConnectionConfiguration {
return startLogErrorsCnt;
}
+ public long getScanTimeoutNs() {
+ return scanTimeoutNs;
+ }
+
+ public int getScannerCaching() {
+ return scannerCaching;
+ }
+
+ public long getScannerMaxResultSize() {
+ return scannerMaxResultSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index dc75ba6..321fd71 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* TODO: reimplement using aync connection when the scan logic is ready. The current implementation
@@ -52,6 +55,26 @@ class AsyncRegionLocator implements Closeable {
return future;
}
+ CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+ byte[] startRowOfCurrentRegion, boolean reload) {
+ CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+ byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
+ try {
+ for (;;) {
+ HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
+ byte[] endKey = loc.getRegionInfo().getEndKey();
+ if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
+ future.complete(loc);
+ break;
+ }
+ toLocateRow = endKey;
+ }
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
ServerName source) {
conn.updateCachedLocations(tableName, regionName, row, exception, source);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index c5ac9a5..9020ce5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.util.HashedWheelTimer;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
+ private boolean locateToPreviousRegion;
+
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@@ -64,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public SingleRequestCallerBuilder<T> action(
- AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+ public SingleRequestCallerBuilder<T>
+ action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@@ -80,11 +84,18 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SingleRequestCallerBuilder<T> locateToPreviousRegion(boolean locateToPreviousRegion) {
+ this.locateToPreviousRegion = locateToPreviousRegion;
+ return this;
+ }
+
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
- Preconditions.checkNotNull(tableName, "tableName is null"),
- Preconditions.checkNotNull(row, "row is null"),
- Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
+ checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
+ locateToPreviousRegion
+ ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
+ : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
+ checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
conn.connConf.getStartLogErrorsCnt());
}
@@ -103,4 +114,64 @@ class AsyncRpcRetryingCallerFactory {
public <T> SingleRequestCallerBuilder<T> single() {
return new SingleRequestCallerBuilder<>();
}
+
+ public class SmallScanCallerBuilder {
+
+ private TableName tableName;
+
+ private Scan scan;
+
+ private int limit;
+
+ private long scanTimeoutNs = -1L;
+
+ private long rpcTimeoutNs = -1L;
+
+ public SmallScanCallerBuilder table(TableName tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public SmallScanCallerBuilder setScan(Scan scan) {
+ this.scan = scan;
+ return this;
+ }
+
+ public SmallScanCallerBuilder limit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
+ this.scanTimeoutNs = unit.toNanos(scanTimeout);
+ return this;
+ }
+
+ public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+ return this;
+ }
+
+ public AsyncSmallScanRpcRetryingCaller build() {
+ TableName tableName = checkNotNull(this.tableName, "tableName is null");
+ Scan scan = checkNotNull(this.scan, "scan is null");
+ checkArgument(limit > 0, "invalid limit %d", limit);
+ return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
+ rpcTimeoutNs);
+ }
+
+ /**
+ * Shortcut for {@code build().call()}
+ */
+ public CompletableFuture<List<Result>> call() {
+ return build().call();
+ }
+ }
+
+ /**
+ * Create retry caller for small scan.
+ */
+ public SmallScanCallerBuilder smallScan() {
+ return new SmallScanCallerBuilder();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 8acde94..1d0357d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -60,6 +60,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
+ @FunctionalInterface
+ public interface RegionLocator {
+ CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
+ byte[] row, boolean reload);
+ }
+
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
@@ -68,6 +74,8 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final byte[] row;
+ private final RegionLocator locator;
+
private final Callable<T> callable;
private final long pauseNs;
@@ -89,12 +97,13 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int maxRetries,
- long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
+ int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.row = row;
+ this.locator = locator;
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries);
@@ -207,7 +216,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}
private void locateThenCall() {
- conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> {
+ locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
new file mode 100644
index 0000000..af639c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Retry caller for smaller scan.
+ */
+@InterfaceAudience.Private
+class AsyncSmallScanRpcRetryingCaller {
+
+ private final AsyncConnectionImpl conn;
+
+ private final TableName tableName;
+
+ private final Scan scan;
+
+ private final int limit;
+
+ private final long scanTimeoutNs;
+
+ private final long rpcTimeoutNs;
+
+ private final Function<byte[], byte[]> createClosestNextRow;
+
+ private final Runnable firstScan;
+
+ private final Function<HRegionInfo, Boolean> nextScan;
+
+ private final List<Result> resultList;
+
+ private final CompletableFuture<List<Result>> future;
+
+ public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
+ int limit, long scanTimeoutNs, long rpcTimeoutNs) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.scan = scan;
+ this.limit = limit;
+ this.scanTimeoutNs = scanTimeoutNs;
+ this.rpcTimeoutNs = rpcTimeoutNs;
+ if (scan.isReversed()) {
+ this.createClosestNextRow = ConnectionUtils::createClosestRowBefore;
+ this.firstScan = this::reversedFirstScan;
+ this.nextScan = this::reversedNextScan;
+ } else {
+ this.createClosestNextRow = ConnectionUtils::createClosestRowAfter;
+ this.firstScan = this::firstScan;
+ this.nextScan = this::nextScan;
+ }
+ this.resultList = new ArrayList<>();
+ this.future = new CompletableFuture<>();
+ }
+
+ private static final class SmallScanResponse {
+
+ public final Result[] results;
+
+ public final HRegionInfo currentRegion;
+
+ public final boolean hasMoreResultsInRegion;
+
+ public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
+ boolean hasMoreResultsInRegion) {
+ this.results = results;
+ this.currentRegion = currentRegion;
+ this.hasMoreResultsInRegion = hasMoreResultsInRegion;
+ }
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "Findbugs seems to be confused by lambda expression.")
+ private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
+ HRegionLocation loc, ClientService.Interface stub) {
+ CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
+ ScanRequest req;
+ try {
+ req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
+ limit - resultList.size(), true);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ stub.scan(controller, req, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
+ future.complete(
+ new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ });
+ return future;
+ }
+
+ private void onComplete(SmallScanResponse resp) {
+ resultList.addAll(Arrays.asList(resp.results));
+ if (resultList.size() == limit) {
+ future.complete(resultList);
+ return;
+ }
+ if (resp.hasMoreResultsInRegion) {
+ if (resp.results.length > 0) {
+ scan.setStartRow(
+ createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
+ }
+ scan(false);
+ return;
+ }
+ if (!nextScan.apply(resp.currentRegion)) {
+ future.complete(resultList);
+ }
+ }
+
+ private void scan(boolean locateToPreviousRegion) {
+ conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
+ .locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call()
+ .whenComplete((resp, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ onComplete(resp);
+ }
+ });
+ }
+
+ public CompletableFuture<List<Result>> call() {
+ firstScan.run();
+ return future;
+ }
+
+ private void firstScan() {
+ scan(false);
+ }
+
+ private void reversedFirstScan() {
+ scan(isEmptyStartRow(scan.getStartRow()));
+ }
+
+ private boolean nextScan(HRegionInfo region) {
+ if (isEmptyStopRow(scan.getStopRow())) {
+ if (isEmptyStopRow(region.getEndKey())) {
+ return false;
+ }
+ } else {
+ if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) {
+ return false;
+ }
+ }
+ scan.setStartRow(region.getEndKey());
+ scan(false);
+ return true;
+ }
+
+ private boolean reversedNextScan(HRegionInfo region) {
+ if (isEmptyStopRow(scan.getStopRow())) {
+ if (isEmptyStartRow(region.getStartKey())) {
+ return false;
+ }
+ } else {
+ if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) {
+ return false;
+ }
+ }
+ scan.setStartRow(region.getStartKey());
+ scan(true);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 4642746..94747b9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -312,4 +313,26 @@ public interface AsyncTable {
*/
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation);
+
+ /**
+ * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
+ * @see #smallScan(Scan, int)
+ */
+ default CompletableFuture<List<Result>> smallScan(Scan scan) {
+ return smallScan(scan, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Return all the results that match the given scan object. The number of the returned results
+ * will not be greater than {@code limit}.
+ * <p>
+ * Notice that the scan must be small, and should not use batch or allowPartialResults. The
+ * {@code caching} property of the scan object is also ignored as we will use {@code limit}
+ * instead.
+ * @param scan A configured {@link Scan} object.
+ * @param limit the limit of results count
+ * @return The results of this small scan operation. The return value will be wrapped by a
+ * {@link CompletableFuture}.
+ */
+ CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 77a5bbe..ce53775 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The implementation of AsyncTable.
@@ -57,12 +59,18 @@ class AsyncTableImpl implements AsyncTable {
private final TableName tableName;
+ private final int defaultScannerCaching;
+
+ private final long defaultScannerMaxResultSize;
+
private long readRpcTimeoutNs;
private long writeRpcTimeoutNs;
private long operationTimeoutNs;
+ private long scanTimeoutNs;
+
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
@@ -70,6 +78,9 @@ class AsyncTableImpl implements AsyncTable {
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
: conn.connConf.getOperationTimeoutNs();
+ this.defaultScannerCaching = conn.connConf.getScannerCaching();
+ this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
+ this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
}
@Override
@@ -256,8 +267,8 @@ class AsyncTableImpl implements AsyncTable {
future.completeExceptionally(controller.getFailed());
} else {
try {
- org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter
- .getResults(req, resp, controller.cellScanner());
+ org.apache.hadoop.hbase.client.MultiResponse multiResp =
+ ResponseConverter.getResults(req, resp, controller.cellScanner());
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future
@@ -305,6 +316,38 @@ class AsyncTableImpl implements AsyncTable {
.call();
}
+ private <T> CompletableFuture<T> failedFuture(Throwable error) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(error);
+ return future;
+ }
+
+ private Scan setDefaultScanConfig(Scan scan) {
+ // always create a new scan object as we may reset the start row later.
+ Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
+ if (newScan.getCaching() <= 0) {
+ newScan.setCaching(defaultScannerCaching);
+ }
+ if (newScan.getMaxResultSize() <= 0) {
+ newScan.setMaxResultSize(defaultScannerMaxResultSize);
+ }
+ return newScan;
+ }
+
+ @Override
+ public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
+ if (!scan.isSmall()) {
+ return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
+ }
+ if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
+ return failedFuture(
+ new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
+ }
+ return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
+ .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
+ .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+ }
+
@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 371a68a..00ff350 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,7 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,20 +48,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@@ -56,53 +60,51 @@ import java.util.concurrent.ExecutorService;
*/
@InterfaceAudience.Private
public abstract class ClientScanner extends AbstractClientScanner {
- private static final Log LOG = LogFactory.getLog(ClientScanner.class);
- // A byte array in which all elements are the max byte, and it is used to
- // construct closest front row
- static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
- protected Scan scan;
- protected boolean closed = false;
- // Current region scanner is against. Gets cleared if current region goes
- // wonky: e.g. if it splits on us.
- protected HRegionInfo currentRegion = null;
- protected ScannerCallableWithReplicas callable = null;
- protected Queue<Result> cache;
- /**
- * A list of partial results that have been returned from the server. This list should only
- * contain results if this scanner does not have enough partial results to form the complete
- * result.
- */
- protected final LinkedList<Result> partialResults = new LinkedList<Result>();
- /**
- * The row for which we are accumulating partial Results (i.e. the row of the Results stored
- * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
- * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
- */
- protected byte[] partialResultsRow = null;
- /**
- * The last cell from a not full Row which is added to cache
- */
- protected Cell lastCellLoadedToCache = null;
- protected final int caching;
- protected long lastNext;
- // Keep lastResult returned successfully in case we have to reset scanner.
- protected Result lastResult = null;
- protected final long maxScannerResultSize;
- private final ClusterConnection connection;
- private final TableName tableName;
- protected final int scannerTimeout;
- protected boolean scanMetricsPublished = false;
- protected RpcRetryingCaller<Result []> caller;
- protected RpcControllerFactory rpcControllerFactory;
- protected Configuration conf;
- //The timeout on the primary. Applicable if there are multiple replicas for a region
- //In that case, we will only wait for this much timeout on the primary before going
- //to the replicas and trying the same scan. Note that the retries will still happen
- //on each replica and the first successful results will be taken. A timeout of 0 is
- //disallowed.
- protected final int primaryOperationTimeout;
- private int retries;
- protected final ExecutorService pool;
+ private static final Log LOG = LogFactory.getLog(ClientScanner.class);
+
+ protected Scan scan;
+ protected boolean closed = false;
+ // Current region scanner is against. Gets cleared if current region goes
+ // wonky: e.g. if it splits on us.
+ protected HRegionInfo currentRegion = null;
+ protected ScannerCallableWithReplicas callable = null;
+ protected Queue<Result> cache;
+ /**
+ * A list of partial results that have been returned from the server. This list should only
+ * contain results if this scanner does not have enough partial results to form the complete
+ * result.
+ */
+ protected final LinkedList<Result> partialResults = new LinkedList<Result>();
+ /**
+ * The row for which we are accumulating partial Results (i.e. the row of the Results stored
+ * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
+ * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
+ */
+ protected byte[] partialResultsRow = null;
+ /**
+ * The last cell from a not full Row which is added to cache
+ */
+ protected Cell lastCellLoadedToCache = null;
+ protected final int caching;
+ protected long lastNext;
+ // Keep lastResult returned successfully in case we have to reset scanner.
+ protected Result lastResult = null;
+ protected final long maxScannerResultSize;
+ private final ClusterConnection connection;
+ private final TableName tableName;
+ protected final int scannerTimeout;
+ protected boolean scanMetricsPublished = false;
+ protected RpcRetryingCaller<Result[]> caller;
+ protected RpcControllerFactory rpcControllerFactory;
+ protected Configuration conf;
+ // The timeout on the primary. Applicable if there are multiple replicas for a region
+ // In that case, we will only wait for this much timeout on the primary before going
+ // to the replicas and trying the same scan. Note that the retries will still happen
+ // on each replica and the first successful results will be taken. A timeout of 0 is
+ // disallowed.
+ protected final int primaryOperationTimeout;
+ private int retries;
+ protected final ExecutorService pool;
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@@ -447,7 +449,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
- scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+ scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
}
} else {
// we need rescan this row because we only loaded partial row before
@@ -737,49 +739,27 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
}
- @Override
- public void close() {
- if (!scanMetricsPublished) writeScanMetrics();
- if (callable != null) {
- callable.setClose();
- try {
- call(callable, caller, scannerTimeout);
- } catch (UnknownScannerException e) {
- // We used to catch this error, interpret, and rethrow. However, we
- // have since decided that it's not nice for a scanner's close to
- // throw exceptions. Chances are it was just due to lease time out.
- if (LOG.isDebugEnabled()) {
- LOG.debug("scanner failed to close", e);
- }
- } catch (IOException e) {
- /* An exception other than UnknownScanner is unexpected. */
- LOG.warn("scanner failed to close.", e);
+ @Override
+ public void close() {
+ if (!scanMetricsPublished) writeScanMetrics();
+ if (callable != null) {
+ callable.setClose();
+ try {
+ call(callable, caller, scannerTimeout);
+ } catch (UnknownScannerException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just due to lease time out.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("scanner failed to close", e);
}
- callable = null;
+ } catch (IOException e) {
+ /* An exception other than UnknownScanner is unexpected. */
+ LOG.warn("scanner failed to close.", e);
}
- closed = true;
- }
-
- /**
- * Create the closest row before the specified row
- * @param row
- * @return a new byte array which is the closest front row of the specified one
- */
- protected static byte[] createClosestRowBefore(byte[] row) {
- if (row == null) {
- throw new IllegalArgumentException("The passed row is empty");
- }
- if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
- return MAX_BYTE_ARRAY;
- }
- if (row[row.length - 1] == 0) {
- return Arrays.copyOf(row, row.length - 1);
- } else {
- byte[] closestFrontRow = Arrays.copyOf(row, row.length);
- closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
- closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
- return closestFrontRow;
+ callable = null;
}
+ closed = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 5fac93a..971a2ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -16,9 +16,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* <p>
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index eca9ad8..e0030e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -33,10 +37,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* Utility used by client connections.
@@ -222,4 +227,41 @@ public final class ConnectionUtils {
return HConstants.NO_NONCE;
}
};
+
+ // A byte array in which all elements are the max byte, and it is used to
+ // construct closest front row
+ static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+
+ /**
+ * Create the closest row after the specified row
+ */
+ static byte[] createClosestRowAfter(byte[] row) {
+ return Arrays.copyOf(row, row.length + 1);
+ }
+
+ /**
+ * Create the closest row before the specified row
+ */
+ static byte[] createClosestRowBefore(byte[] row) {
+ if (row.length == 0) {
+ return MAX_BYTE_ARRAY;
+ }
+ if (row[row.length - 1] == 0) {
+ return Arrays.copyOf(row, row.length - 1);
+ } else {
+ byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
+ System.arraycopy(row, 0, nextRow, 0, row.length - 1);
+ nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
+ System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length);
+ return nextRow;
+ }
+ }
+
+ static boolean isEmptyStartRow(byte[] row) {
+ return Bytes.equals(row, EMPTY_START_ROW);
+ }
+
+ static boolean isEmptyStopRow(byte[] row) {
+ return Bytes.equals(row, EMPTY_END_ROW);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index dde82ba..390e236 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
import java.io.IOException;
import java.util.concurrent.ExecutorService;
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 5174598..e04fd6e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -30,7 +33,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -40,12 +42,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@@ -341,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (callable.getScan().isReversed()) {
callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
} else {
- callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+ callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
new file mode 100644
index 0000000..972780e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableSmallScan {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static int COUNT = 1000;
+
+ private static AsyncConnection ASYNC_CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ byte[][] splitKeys = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ IntStream.range(0, COUNT)
+ .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
+ .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ASYNC_CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testScanAll() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
+ assertEquals(COUNT, results.size());
+ IntStream.range(0, COUNT).forEach(i -> {
+ Result result = results.get(i);
+ assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
+ assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ @Test
+ public void testReversedScanAll() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
+ assertEquals(COUNT, results.size());
+ IntStream.range(0, COUNT).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = COUNT - i - 1;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ @Test
+ public void testScanNoStopKey() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ int start = 345;
+ List<Result> results = table
+ .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
+ assertEquals(COUNT - start, results.size());
+ IntStream.range(0, COUNT - start).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start + i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ @Test
+ public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ int start = 765;
+ List<Result> results = table
+ .smallScan(
+ new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
+ .get();
+ assertEquals(start + 1, results.size());
+ IntStream.range(0, start + 1).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start - i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+ .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
+ assertEquals(stop - start, results.size());
+ IntStream.range(0, stop - start).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start + i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ private void testReversedScan(int start, int stop)
+ throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+ .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
+ .get();
+ assertEquals(start - stop, results.size());
+ IntStream.range(0, start - stop).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start - i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ @Test
+ public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
+ testScan(345, 567);
+ }
+
+ @Test
+ public void testReversedScanWithStartKeyAndStopKey()
+ throws InterruptedException, ExecutionException {
+ testReversedScan(765, 543);
+ }
+
+ @Test
+ public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
+ testScan(222, 333);
+ }
+
+ @Test
+ public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
+ testScan(222, 333);
+ }
+
+ @Test
+ public void testScanWithLimit() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ int start = 111;
+ int stop = 888;
+ int limit = 300;
+ List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+ .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
+ limit).get();
+ assertEquals(limit, results.size());
+ IntStream.range(0, limit).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start + i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+
+ @Test
+ public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
+ AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+ int start = 888;
+ int stop = 111;
+ int limit = 300;
+ List<Result> results = table.smallScan(
+ new Scan(Bytes.toBytes(String.format("%03d", start)))
+ .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
+ limit).get();
+ assertEquals(limit, results.size());
+ IntStream.range(0, limit).forEach(i -> {
+ Result result = results.get(i);
+ int actualIndex = start - i;
+ assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
+ assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd3dd6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 89841a9..ae93e67 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -5933,7 +5934,7 @@ public class TestFromClientSide {
public void testReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions");
- byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY;
+ byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
Bytes.toBytes("006"),