You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/03/12 23:49:53 UTC
svn commit: r1576976 - in /hbase/branches/0.98:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-server/src/test/java/org/apache/hadoop/hbase/client/
Author: apurtell
Date: Wed Mar 12 22:49:52 2014
New Revision: 1576976
URL: http://svn.apache.org/r1576976
Log:
HBASE-9999 Add support for small reverse scan (Nicolas Liochon)
Added:
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
Modified:
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Wed Mar 12 22:49:52 2014
@@ -63,7 +63,7 @@ public class ClientScanner extends Abstr
protected final long maxScannerResultSize;
private final HConnection connection;
private final TableName tableName;
- private final int scannerTimeout;
+ protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
Added: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java?rev=1576976&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java (added)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java Wed Mar 12 22:49:52 2014
@@ -0,0 +1,188 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
+ * scan results, unless the results cross multiple regions or the row count of
+ * results exceed the caching.
+ * <p/>
+ * For small scan, it will get better performance than {@link ReversedClientScanner}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ClientSmallReversedScanner extends ReversedClientScanner {
+ private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
+ private RegionServerCallable<Result[]> smallScanCallable = null;
+ private byte[] skipRowOfFirstResult = null;
+
+ /**
+ * Create a new ReversibleClientScanner for the specified table Note that the
+ * passed {@link org.apache.hadoop.hbase.client.Scan}'s start row maybe changed.
+ *
+ * @param conf The {@link org.apache.hadoop.conf.Configuration} to use.
+ * @param scan {@link org.apache.hadoop.hbase.client.Scan} to use in this scanner
+ * @param tableName The table that we wish to scan
+ * @param connection Connection identifying the cluster
+ * @throws java.io.IOException
+ */
+ public ClientSmallReversedScanner(Configuration conf, Scan scan, TableName tableName,
+ HConnection connection) throws IOException {
+ super(conf, scan, tableName, connection);
+ }
+
+ /**
+ * Gets a scanner for following scan. Move to next region or continue from the
+ * last result or start from the start row.
+ *
+ * @param nbRows
+ * @param done true if Server-side says we're done scanning.
+ * @param currentRegionDone true if scan is over on current region
+ * @return true if has next scanner
+ * @throws IOException
+ */
+ private boolean nextScanner(int nbRows, final boolean done,
+ boolean currentRegionDone) throws IOException {
+ // Where to start the next getter
+ byte[] localStartKey;
+ int cacheNum = nbRows;
+ skipRowOfFirstResult = null;
+ // if we're at end of table, close and return false to stop iterating
+ if (this.currentRegion != null && currentRegionDone) {
+ byte[] startKey = this.currentRegion.getStartKey();
+ if (startKey == null
+ || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
+ || checkScanStopRow(startKey) || done) {
+ close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished with small scan at " + this.currentRegion);
+ }
+ return false;
+ }
+ // We take the row just under to get to the previous region.
+ localStartKey = createClosestRowBefore(startKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished with region " + this.currentRegion);
+ }
+ } else if (this.lastResult != null) {
+ localStartKey = this.lastResult.getRow();
+ skipRowOfFirstResult = this.lastResult.getRow();
+ cacheNum++;
+ } else {
+ localStartKey = this.scan.getStartRow();
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Advancing internal small scanner to startKey at '"
+ + Bytes.toStringBinary(localStartKey) + "'");
+ }
+
+ smallScanCallable = ClientSmallScanner.getSmallScanCallable(
+ scan, getConnection(), getTable(), localStartKey, cacheNum);
+
+ if (this.scanMetrics != null && skipRowOfFirstResult == null) {
+ this.scanMetrics.countOfRegions.incrementAndGet();
+ }
+ return true;
+ }
+
+ @Override
+ public Result next() throws IOException {
+ // If the scanner is closed and there's nothing left in the cache, next is a
+ // no-op.
+ if (cache.size() == 0 && this.closed) {
+ return null;
+ }
+ if (cache.size() == 0) {
+ Result[] values = null;
+ long remainingResultSize = maxScannerResultSize;
+ int countdown = this.caching;
+ boolean currentRegionDone = false;
+ // Values == null means server-side filter has determined we must STOP
+ while (remainingResultSize > 0 && countdown > 0
+ && nextScanner(countdown, values == null, currentRegionDone)) {
+ // Server returns a null values if scanning is to stop. Else,
+ // returns an empty array if scanning is to go on and we've just
+ // exhausted current region.
+ values = this.caller.callWithRetries(smallScanCallable, scannerTimeout);
+ this.currentRegion = smallScanCallable.getHRegionInfo();
+ long currentTime = System.currentTimeMillis();
+ if (this.scanMetrics != null) {
+ this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
+ - lastNext);
+ }
+ lastNext = currentTime;
+ if (values != null && values.length > 0) {
+ for (int i = 0; i < values.length; i++) {
+ Result rs = values[i];
+ if (i == 0 && this.skipRowOfFirstResult != null
+ && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
+ // Skip the first result
+ continue;
+ }
+ cache.add(rs);
+ for (Cell kv : rs.rawCells()) {
+ remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
+ }
+ countdown--;
+ this.lastResult = rs;
+ }
+ }
+ currentRegionDone = countdown > 0;
+ }
+ }
+
+ if (cache.size() > 0) {
+ return cache.poll();
+ }
+ // if we exhausted this scanner before calling close, write out the scan
+ // metrics
+ writeScanMetrics();
+ return null;
+ }
+
+
+ @Override
+ protected void initializeScannerInConstruction() throws IOException {
+ // No need to initialize the scanner when constructing instance, do it when
+ // calling next(). Do nothing here.
+ }
+
+ @Override
+ public void close() {
+ if (!scanMetricsPublished) writeScanMetrics();
+ closed = true;
+ }
+
+}
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Wed Mar 12 22:49:52 2014
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,7 +27,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@@ -153,21 +150,23 @@ public class ClientSmallScanner extends
LOG.trace("Advancing internal small scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'");
}
- smallScanCallable = getSmallScanCallable(localStartKey, cacheNum);
+ smallScanCallable = getSmallScanCallable(
+ scan, getConnection(), getTable(), localStartKey, cacheNum);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
return true;
}
- private RegionServerCallable<Result[]> getSmallScanCallable(
- byte[] localStartKey, final int cacheNum) {
- this.scan.setStartRow(localStartKey);
+ static RegionServerCallable<Result[]> getSmallScanCallable(
+ final Scan sc, HConnection connection, TableName table, byte[] localStartKey,
+ final int cacheNum) throws IOException {
+ sc.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
- getConnection(), getTable(), scan.getStartRow()) {
+ connection, table, sc.getStartRow()) {
public Result[] call() throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
- .getRegionInfo().getRegionName(), scan, cacheNum, true);
+ .getRegionInfo().getRegionName(), sc, cacheNum, true);
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 12 22:49:52 2014
@@ -718,15 +718,24 @@ public class HTable implements HTableInt
if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching());
}
- if (scan.isSmall() && !scan.isReversed()) {
+
+ if (scan.isReversed()) {
+ if (scan.isSmall()) {
+ return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
+ this.connection);
+ } else {
+ return new ReversedClientScanner(getConfiguration(), scan, getName(),
+ this.connection);
+ }
+ }
+
+ if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
- this.connection);
- } else if (scan.isReversed()) {
- return new ReversedClientScanner(getConfiguration(), scan, getName(),
- this.connection);
+ this.connection, this.rpcCallerFactory);
+ } else {
+ return new ClientScanner(getConfiguration(), scan,
+ getName(), this.connection);
}
- return new ClientScanner(getConfiguration(), scan,
- getName(), this.connection);
}
/**
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java Wed Mar 12 22:49:52 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
/**
* A reversed client scanner which support backward scanning
@@ -114,6 +115,7 @@ public class ReversedClientScanner exten
this.scanMetrics.countOfRegions.incrementAndGet();
}
} catch (IOException e) {
+ ExceptionUtil.rethrowIfInterrupt(e);
close();
throw e;
}
@@ -151,7 +153,7 @@ public class ReversedClientScanner exten
* @param row
* @return a new byte array which is the closest front row of the specified one
*/
- private byte[] createClosestRowBefore(byte[] row) {
+ protected byte[] createClosestRowBefore(byte[] row) {
if (row == null) {
throw new IllegalArgumentException("The passed row is empty");
}
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java Wed Mar 12 22:49:52 2014
@@ -70,7 +70,7 @@ public class ReversedScannerCallable ext
this.location = connection.getRegionLocation(tableName, row, reload);
if (this.location == null) {
throw new IOException("Failed to find location, tableName="
- + tableName + ", row=" + Bytes.toString(row) + ", reload="
+ + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
+ reload);
}
} else {
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Mar 12 22:49:52 2014
@@ -5835,4 +5835,170 @@ public class TestFromClientSide {
assertEquals(insertNum, count);
table.close();
}
+
+
+ /**
+ * Tests reversed scan under multi regions
+ */
+ @Test
+ public void testSmallReversedScanUnderMultiRegions() throws Exception {
+ // Test Initialization.
+ byte[] TABLE = Bytes.toBytes("testSmallReversedScanUnderMultiRegions");
+ byte[][] splitRows = new byte[][]{
+ Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"),
+ Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")};
+ HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows);
+ TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
+
+ assertEquals(splitRows.length + 1, table.getRegionLocations().size());
+ for (byte[] splitRow : splitRows) {
+ Put put = new Put(splitRow);
+ put.add(FAMILY, QUALIFIER, VALUE);
+ table.put(put);
+
+ byte[] nextRow = Bytes.copy(splitRow);
+ nextRow[nextRow.length - 1]++;
+
+ put = new Put(nextRow);
+ put.add(FAMILY, QUALIFIER, VALUE);
+ table.put(put);
+ }
+
+ // scan forward
+ ResultScanner scanner = table.getScanner(new Scan());
+ int count = 0;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ }
+ assertEquals(12, count);
+
+ reverseScanTest(table, false);
+ reverseScanTest(table, true);
+
+ table.close();
+ }
+
+ private void reverseScanTest(HTable table, boolean small) throws IOException {
+ // scan backward
+ Scan scan = new Scan();
+ scan.setReversed(true);
+ ResultScanner scanner = table.getScanner(scan);
+ int count = 0;
+ byte[] lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(12, count);
+
+ scan = new Scan();
+ scan.setSmall(small);
+ scan.setReversed(true);
+ scan.setStartRow(Bytes.toBytes("002"));
+ scanner = table.getScanner(scan);
+ count = 0;
+ lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(3, count); // 000 001 002
+
+ scan = new Scan();
+ scan.setSmall(small);
+ scan.setReversed(true);
+ scan.setStartRow(Bytes.toBytes("002"));
+ scan.setStopRow(Bytes.toBytes("000"));
+ scanner = table.getScanner(scan);
+ count = 0;
+ lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(2, count); // 001 002
+
+ scan = new Scan();
+ scan.setSmall(small);
+ scan.setReversed(true);
+ scan.setStartRow(Bytes.toBytes("001"));
+ scanner = table.getScanner(scan);
+ count = 0;
+ lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(2, count); // 000 001
+
+ scan = new Scan();
+ scan.setSmall(small);
+ scan.setReversed(true);
+ scan.setStartRow(Bytes.toBytes("000"));
+ scanner = table.getScanner(scan);
+ count = 0;
+ lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(1, count); // 000
+
+ scan = new Scan();
+ scan.setSmall(small);
+ scan.setReversed(true);
+ scan.setStartRow(Bytes.toBytes("006"));
+ scan.setStopRow(Bytes.toBytes("002"));
+ scanner = table.getScanner(scan);
+ count = 0;
+ lastRow = null;
+ for (Result r : scanner) {
+ assertTrue(!r.isEmpty());
+ count++;
+ byte[] thisRow = r.getRow();
+ if (lastRow != null) {
+ assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ + ",this row=" + Bytes.toString(thisRow),
+ Bytes.compareTo(thisRow, lastRow) < 0);
+ }
+ lastRow = thisRow;
+ }
+ assertEquals(4, count); // 003 004 005 006
+ }
}