You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ya...@apache.org on 2016/12/02 08:07:50 UTC
hbase git commit: HBASE-16886: hbase-client: scanner with
reversed=true and small=true gets no result
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 5a4972f9c -> c9f388bec
HBASE-16886: hbase-client: scanner with reversed=true and small=true gets no result
Signed-off-by: Phil Yang <ya...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9f388be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9f388be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9f388be
Branch: refs/heads/branch-1.2
Commit: c9f388bece8a89ff52baf358df63c8e5fbd7dc36
Parents: 5a4972f
Author: huzheng <op...@gmail.com>
Authored: Wed Oct 26 11:28:27 2016 +0800
Committer: Phil Yang <ya...@apache.org>
Committed: Fri Dec 2 16:02:31 2016 +0800
----------------------------------------------------------------------
.../client/ClientSmallReversedScanner.java | 115 ++++++++++++++--
.../client/TestClientSmallReversedScanner.java | 16 +--
.../hbase/client/TestSmallReversedScanner.java | 138 +++++++++++++++++++
3 files changed, 248 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index d4de6a0..bd5575a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -29,13 +30,18 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService;
/**
@@ -49,8 +55,8 @@ import java.util.concurrent.ExecutorService;
@InterfaceAudience.Private
public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
- private ScannerCallableWithReplicas smallScanCallable = null;
- private SmallScannerCallableFactory callableFactory;
+ private ScannerCallableWithReplicas smallReversedScanCallable = null;
+ private SmallReversedScannerCallableFactory callableFactory;
/**
* Create a new ReversibleClientScanner for the specified table. Take note that the passed
@@ -80,7 +86,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
- primaryOperationTimeout, new SmallScannerCallableFactory());
+ primaryOperationTimeout, new SmallReversedScannerCallableFactory());
}
/**
@@ -112,7 +118,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
- SmallScannerCallableFactory callableFactory) throws IOException {
+ SmallReversedScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
this.callableFactory = callableFactory;
@@ -136,6 +142,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
byte[] localStartKey;
int cacheNum = nbRows;
boolean regionChanged = true;
+ boolean isFirstRegionToLocate = false;
// if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) {
byte[] startKey = this.currentRegion.getStartKey();
@@ -158,6 +165,14 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
localStartKey = createClosestRowBefore(lastResult.getRow());
} else {
localStartKey = this.scan.getStartRow();
+ isFirstRegionToLocate = true;
+ }
+
+ if (!isFirstRegionToLocate
+ && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
+ // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
+ // otherwise, maybe infinity results with RowKey=0x00 will return.
+ return false;
}
if (LOG.isTraceEnabled()) {
@@ -165,9 +180,10 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
+ Bytes.toStringBinary(localStartKey) + "'");
}
- smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
- getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
- getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
+ smallReversedScanCallable =
+ callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
+ localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
+ getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
if (this.scanMetrics != null && regionChanged) {
this.scanMetrics.countOfRegions.incrementAndGet();
@@ -209,8 +225,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
- values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
- this.currentRegion = smallScanCallable.getHRegionInfo();
+ values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
+ this.currentRegion = smallReversedScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
@@ -229,8 +245,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
this.lastResult = rs;
}
}
- if (smallScanCallable.hasMoreResultsContext()) {
- currentRegionDone = !smallScanCallable.getServerHasMoreResults();
+ if (smallReversedScanCallable.hasMoreResultsContext()) {
+ currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
} else {
currentRegionDone = countdown > 0;
}
@@ -250,7 +266,80 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
}
@VisibleForTesting
- protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
+ protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory;
}
+
+ /**
+ * A reversed ScannerCallable which supports backward small scanning.
+ */
+ static class SmallReversedScannerCallable extends ReversedScannerCallable {
+
+ public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
+ ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
+ int caching, int replicaId) {
+ super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
+ this.setCaching(caching);
+ }
+
+ @Override
+ public Result[] call(int timeout) throws IOException {
+ if (this.closed) return null;
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+ ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
+ getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
+ ClientProtos.ScanResponse response = null;
+ controller = controllerFactory.newController();
+ try {
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(timeout);
+ response = getStub().scan(controller, request);
+ Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
+ if (response.hasMoreResultsInRegion()) {
+ setHasMoreResultsContext(true);
+ setServerHasMoreResults(response.getMoreResultsInRegion());
+ } else {
+ setHasMoreResultsContext(false);
+ }
+ // We need to update result metrics since we are overriding call()
+ updateResultsMetrics(results);
+ return results;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ @Override
+ public ScannerCallable getScannerCallableForReplica(int id) {
+ return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
+ scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
+ }
+ }
+
+ protected static class SmallReversedScannerCallableFactory {
+
+ public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
+ Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
+ RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
+ int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
+ boolean isFirstRegionToLocate) {
+ byte[] locateStartRow = null;
+ if (isFirstRegionToLocate
+ && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
+ // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
+ // locate a region list, and the last one in region list is the region where our scan start.
+ locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
+ }
+
+ scan.setStartRow(localStartKey);
+ SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
+ scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
+ ScannerCallableWithReplicas scannerCallableWithReplicas =
+ new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
+ retries, scannerTimeout, cacheNum, conf, caller);
+ return scannerCallableWithReplicas;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
index 4611d08..57b52e6 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -102,15 +102,15 @@ public class TestClientSmallReversedScanner {
};
}
- private SmallScannerCallableFactory getFactory(
+ private SmallReversedScannerCallableFactory getFactory(
final ScannerCallableWithReplicas callableWithReplicas) {
- return new SmallScannerCallableFactory() {
+ return new SmallReversedScannerCallableFactory() {
@Override
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
- RpcRetryingCaller<Result[]> caller) {
+ RpcRetryingCaller<Result[]> caller, boolean isFirstRegionToLocate) {
return callableWithReplicas;
}
};
@@ -135,7 +135,7 @@ public class TestClientSmallReversedScanner {
// Intentionally leave a "default" caching size in the Scan. No matter the value, we
// should continue based on the server context
- SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+ SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -204,7 +204,7 @@ public class TestClientSmallReversedScanner {
// While the server returns 2 records per batch, we expect more records.
scan.setCaching(2);
- SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+ SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -280,7 +280,7 @@ public class TestClientSmallReversedScanner {
// While the server return 2 records per RPC, we expect there to be more records.
scan.setCaching(2);
- SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+ SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -316,7 +316,7 @@ public class TestClientSmallReversedScanner {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
- SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+ SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
new file mode 100644
index 0000000..3a4e92b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+
+@Category(MediumTests.class)
+public class TestSmallReversedScanner {
+ public static final Log LOG = LogFactory.getLog(TestSmallReversedScanner.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("testReversedSmall");
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("columnFamily");
+
+ private static Table htable = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+
+ // create a table with 4 region: (-oo, b),[b,c),[c,d),[d,+oo)
+ byte[] bytes = Bytes.toBytes("bcd");
+ byte[][] splitKeys = new byte[bytes.length][];
+
+ for (int i = 0; i < bytes.length; i++) {
+ splitKeys[i] = new byte[] { bytes[i] };
+ }
+ htable = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY, splitKeys);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ TEST_UTIL.deleteTableData(TABLE_NAME);
+ }
+
+ /**
+ * all rowKeys are fit in the last region.
+ * @throws IOException
+ */
+ @Test
+ public void testSmallReversedScan01() throws IOException {
+ String[][] keysCases = new String[][] {
+ { "d0", "d1", "d2", "d3" }, // all rowKeys fit in the last region.
+ { "a0", "a1", "a2", "a3" }, // all rowKeys fit in the first region.
+ { "a0", "b1", "c2", "d3" }, // each region with a rowKey
+ };
+
+ for (int caseIndex = 0; caseIndex < keysCases.length; caseIndex++) {
+ testSmallReversedScanInternal(keysCases[caseIndex]);
+ TEST_UTIL.deleteTableData(TABLE_NAME);
+ }
+ }
+
+ private void testSmallReversedScanInternal(String[] inputRowKeys) throws IOException {
+ int rowCount = inputRowKeys.length;
+
+ for (int i = 0; i < rowCount; i++) {
+ Put put = new Put(Bytes.toBytes(inputRowKeys[i]));
+ put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(i));
+ htable.put(put);
+ }
+
+ Scan scan = new Scan();
+ scan.setReversed(true);
+ scan.setSmall(true);
+
+ ResultScanner scanner = htable.getScanner(scan);
+ Result r;
+ int value = rowCount;
+ while ((r = scanner.next()) != null) {
+ Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(--value));
+ Assert.assertArrayEquals(r.getRow(), Bytes.toBytes(inputRowKeys[value]));
+ }
+
+ Assert.assertEquals(value, 0);
+ }
+
+ /**
+ * Corner case:
+ * HBase has 4 regions, (-oo,b),[b,c),[c,d),[d,+oo), and only rowKey with byte[]={0x00} locate in region (-oo,b) .
+ * test whether reversed small scanner will return infinity results with RowKey={0x00}.
+ * @throws IOException
+ */
+ @Test
+ public void testSmallReversedScan02() throws IOException {
+ Put put = new Put(new byte[] { (char) 0x00 });
+ put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(0));
+ htable.put(put);
+
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setReversed(true);
+ scan.setSmall(true);
+
+ ResultScanner scanner = htable.getScanner(scan);
+ Result r;
+ int count = 1;
+ while ((r = scanner.next()) != null) {
+ Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(0));
+ Assert.assertArrayEquals(r.getRow(), new byte[] { (char) 0x00 });
+ Assert.assertTrue(--count >= 0);
+ }
+ Assert.assertEquals(count, 0);
+ }
+}