You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/12/10 02:55:57 UTC
svn commit: r1212712 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java
Author: nspiegelberg
Date: Sat Dec 10 01:55:56 2011
New Revision: 1212712
URL: http://svn.apache.org/viewvc?rev=1212712&view=rev
Log:
D519 HBASE-4823 [jira] long running scans lose benefit of bloomfilters and timerange hints
Revision Status: Accepted · Next step: arc commit --revision 519
Author: aaiyer
Reviewed by: kannan
Test Plan: mvn test on the MR
Unit: ★ No Unit Test Coverage
Lint: ★ No Linters Available
Host: dev1396.snc6.facebook.com
Path: /data/users/aaiyer/external-89/0.89-fb/ (svn)
Arcanist Project: hbase
Apply Patch: arc patch D519
Export Patch: arc export --revision 51
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1212712&r1=1212711&r2=1212712&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Dec 10 01:55:56 2011
@@ -51,6 +51,8 @@ class StoreScanner extends NonLazyKeyVal
private final boolean isGet;
private final boolean explicitColumnQuery;
private final boolean useRowColBloom;
+ private final Scan scan;
+ private final NavigableSet<byte[]> columns;
/** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
@@ -71,6 +73,8 @@ class StoreScanner extends NonLazyKeyVal
isGet = scan.isGetScan();
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
+ this.scan = scan;
+ this.columns = columns;
// We look up row-column Bloom filters for multi-column queries as part of
// the seek operation. However, we also look the row-column Bloom filter
@@ -190,13 +194,6 @@ class StoreScanner extends NonLazyKeyVal
}
/*
- * @return List of scanners ordered properly.
- */
- private List<KeyValueScanner> getScanners() throws IOException {
- return this.store.getScanners(cacheBlocks, isGet, false, null);
- }
-
- /*
* @return List of scanners to seek, possibly filtered by StoreFile.
*/
private List<KeyValueScanner> getScanners(Scan scan,
@@ -253,7 +250,7 @@ class StoreScanner extends NonLazyKeyVal
public synchronized boolean seek(KeyValue key) throws IOException {
if (this.heap == null) {
- List<KeyValueScanner> scanners = getScanners();
+ List<KeyValueScanner> scanners = getScanners(scan, columns);
heap = new KeyValueHeap(scanners, store.comparator);
}
@@ -443,10 +440,7 @@ class StoreScanner extends NonLazyKeyVal
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
- /* When we have the scan object, should we not pass it to getScanners()
- * to get a limited set of scanners? We did so in the constructor and we
- * could have done it now by storing the scan object from the constructor */
- List<KeyValueScanner> scanners = getScanners();
+ List<KeyValueScanner> scanners = getScanners(scan, columns);
for(KeyValueScanner scanner : scanners) {
scanner.seek(lastTopKey);
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java?rev=1212712&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerResets.java Sat Dec 10 01:55:56 2011
@@ -0,0 +1,193 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+
+import org.junit.Test;
+
+public class TestScannerResets extends HBaseTestCase {
+ static final Log LOG = LogFactory.getLog(TestBlocksRead.class);
+
+ private HBaseConfiguration getConf() {
+ HBaseConfiguration conf = new HBaseConfiguration();
+
+ // disable compactions in this test.
+ conf.setInt("hbase.hstore.compactionThreshold", 10000);
+ return conf;
+ }
+
+ HRegion region = null;
+ private final String DIR = HBaseTestingUtility.getTestDir() + "/TestScannerResets/";
+
+ /**
+ * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ EnvironmentEdgeManagerTestHelper.reset();
+ }
+
+ private void initHRegion(byte[] tableName, String callingMethod,
+ HBaseConfiguration conf, String family) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ HColumnDescriptor familyDesc;
+ familyDesc = new HColumnDescriptor(
+ Bytes.toBytes(family),
+ HColumnDescriptor.DEFAULT_VERSIONS,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ 1, // small block size deliberate; each kv on its own block
+ HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+ htd.addFamily(familyDesc);
+ HRegionInfo info = new HRegionInfo(htd, null, null, false);
+ Path path = new Path(DIR + callingMethod);
+ region = HRegion.createHRegion(info, path, conf);
+ }
+
+ private void putData(String family, String row, String col, long version)
+ throws IOException {
+ putData(Bytes.toBytes(family), row, col, version, version);
+ }
+
+ // generates a value to put for a row/col/version.
+ private static byte[] genValue(String row, String col, long version) {
+ return Bytes.toBytes("Value:" + row + "#" + col + "#" + version);
+ }
+
+ private void putData(byte[] cf, String row, String col, long versionStart,
+ long versionEnd) throws IOException {
+ byte columnBytes[] = Bytes.toBytes(col);
+ Put put = new Put(Bytes.toBytes(row));
+
+ for (long version = versionStart; version <= versionEnd; version++) {
+ put.add(cf, columnBytes, version, genValue(row, col, version));
+ }
+ region.put(put);
+ }
+
+ private static void verifyData(KeyValue kv, String expectedRow,
+ String expectedCol, long expectedVersion) {
+ assertEquals("RowCheck", expectedRow, Bytes.toString(kv.getRow()));
+ assertEquals("ColumnCheck", expectedCol, Bytes.toString(kv.getQualifier()));
+ assertEquals("TSCheck", expectedVersion, kv.getTimestamp());
+ assertEquals("ValueCheck",
+ Bytes.toString(genValue(expectedRow, expectedCol, expectedVersion)),
+ Bytes.toString(kv.getValue()));
+ }
+
+ // how many data blocks in the specified CF were accessed so far...
+ private static long getBlkAccessCount(byte[] cf) {
+ return HRegion.getNumericMetric(SchemaMetrics.CF_PREFIX
+ + Bytes.toString(cf) + "." + SchemaMetrics.BLOCK_TYPE_PREFIX
+ + "Data.fsBlockReadCnt");
+ }
+
+ private void scanRange(String family, int minInclusiveTS, int maxExclusiveTS,
+ boolean flushInBetween, int expBlocks) throws IOException {
+ byte[] cf = Bytes.toBytes(family);
+ long blocksStart = getBlkAccessCount(cf);
+
+ // Setup a scan to read items in time range [6..9).
+ Scan s = new Scan();
+ s.setTimeRange(minInclusiveTS, maxExclusiveTS);
+ InternalScanner scanner = region.getScanner(s);
+ int idx = minInclusiveTS;
+ List<KeyValue> kvs;
+ do {
+ kvs = new ArrayList<KeyValue>();
+ scanner.next(kvs);
+ if (kvs.size() == 0)
+ break;
+ assertEquals(1, kvs.size());
+ verifyData(kvs.get(0), "row" + idx, "col" + idx, idx);
+ if (flushInBetween && (idx == minInclusiveTS)) { // first iteration?
+ // On the first iteration, after a next() has been done,
+ // put some new KVs and flush a new file. This should
+ // cause scanner to be reset to a new set of store
+ // files.
+ // File 3: Timestamp range [9..9]
+ putData(family, "row9", "col9", 9);
+ region.flushcache();
+ }
+ idx++;
+ } while (true);
+ assertEquals("ending index", idx, maxExclusiveTS);
+
+ long blocksEnd = getBlkAccessCount(cf);
+
+ assertEquals("Blocks Read Check: ", expBlocks, blocksEnd - blocksStart);
+ System.out.println("Blocks Read = "
+ + (blocksEnd - blocksStart) + "Expected = " + expBlocks);
+ }
+
+ @Test
+ public void testScannerReset() throws Exception {
+ byte[] TABLE = Bytes.toBytes("testScannerReset");
+ String FAMILY = "cf1";
+ HBaseConfiguration conf = getConf();
+ initHRegion(TABLE, getName(), conf, FAMILY);
+
+ // File1: Timestamp range [1..3]
+ putData(FAMILY, "row1", "col1", 1);
+ putData(FAMILY, "row2", "col2", 2);
+ putData(FAMILY, "row3", "col3", 3);
+ region.flushcache();
+
+ // File2: Timestamp range of keys [4..8]
+ putData(FAMILY, "row4", "col4", 4);
+ putData(FAMILY, "row5", "col5", 5);
+ putData(FAMILY, "row6", "col6", 6);
+ putData(FAMILY, "row7", "col7", 7);
+ putData(FAMILY, "row8", "col8", 8);
+ region.flushcache();
+
+ // Scan values in time range [1..4).
+ // with flushing in between being FALSE.
+ //
+ // Expected blocks read = 3 because only File1
+ // contains data in relevant time range we are
+ // interested in.
+ scanRange(FAMILY, 1, 4, false, 3);
+
+ // Scan values in time range [1..4).
+ // with flushing in between being TRUE.
+ //
+ // Expected blocks read = 4 because only File1
+ // contains data in relevant time range even with
+ // flushes in between. Note: File1 only has 3 blocks.
+ // But we expect to read 1 extra block because reset
+ // of the scanner stack will cause an extra seek into
+ // the File1. However, due to bug HBASE-4823, this
+ // test reads 10 blocks-- 4 blocks in File1, 5 in File2,
+ // and 1 in File3 (the flushed file).
+ scanRange(FAMILY, 1, 4, true, 4);
+ }
+}