You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:07:54 UTC
svn commit: r1181431 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: nspiegelberg
Date: Tue Oct 11 02:07:54 2011
New Revision: 1181431
URL: http://svn.apache.org/viewvc?rev=1181431&view=rev
Log:
Unify Major/Minor Compaction code
Summary:
Unify the code path for major/minor compaction.
Initial diff, with some tests added to testCompaction.
Test Plan:
Added a test to testCompaction. Need to extend further to test other forms of
deletes.
running all the tests to make sure this change doesn't break anything else.
DiffCamp Revision: 175162
Reviewed By: kannan
Commenters: jgray, nspiegelberg
CC: jgray, aaiyer, nspiegelberg, achao, kannan, kranganathan, aravind
Tasks:
#423541: DL tier: java.lang.IllegalArgumentException during compactions
Revert Plan:
ok
Removed:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Tue Oct 11 02:07:54 2011
@@ -45,6 +45,7 @@ public class ScanQueryMatcher {
/** Keeps track of deletes */
protected DeleteTracker deletes;
+ protected boolean retainDeletesInOutput;
/** Keeps track of columns and versions */
protected ColumnTracker columns;
@@ -71,7 +72,8 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
- KeyValue.KeyComparator rowComparator, int maxVersions) {
+ KeyValue.KeyComparator rowComparator, int maxVersions,
+ boolean retainDeletesInOutput) {
this.tr = scan.getTimeRange();
this.oldestStamp = System.currentTimeMillis() - ttl;
this.rowComparator = rowComparator;
@@ -79,6 +81,7 @@ public class ScanQueryMatcher {
this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
this.filter = scan.getFilter();
+ this.retainDeletesInOutput = retainDeletesInOutput;
// Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) {
@@ -90,6 +93,13 @@ public class ScanQueryMatcher {
this.columns = new ExplicitColumnTracker(columns,maxVersions);
}
}
+ public ScanQueryMatcher(Scan scan, byte [] family,
+ NavigableSet<byte[]> columns, long ttl,
+ KeyValue.KeyComparator rowComparator, int maxVersions) {
+ /* By default we will not include deletes */
+ /* deletes are included explicitly (for minor compaction) */
+ this(scan, family, columns, ttl, rowComparator, maxVersions, false);
+ }
/**
* Determines if the caller should do one of several things:
@@ -159,7 +169,12 @@ public class ScanQueryMatcher {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
// Can't early out now, because DelFam come before any other keys
}
- return MatchCode.SKIP;
+ if (retainDeletesInOutput) {
+ return MatchCode.INCLUDE;
+ }
+ else {
+ return MatchCode.SKIP;
+ }
}
if (!this.deletes.isEmpty() &&
@@ -356,4 +371,4 @@ public class ScanQueryMatcher {
*/
SEEK_NEXT_USING_HINT,
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:07:54 2011
@@ -738,6 +738,28 @@ public class Store implements HeapSize {
}
/*
+ * Compact the most recent N files. Essentially a hook for testing.
+ */
+ protected void compactRecent(int N) throws IOException {
+ synchronized(compactLock) {
+ List<StoreFile> filesToCompact = this.storefiles;
+ int count = filesToCompact.size();
+ if (N > count) {
+ throw new RuntimeException("Not enough files");
+ }
+
+ filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
+ long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+ boolean majorcompaction = (N == count);
+
+ // Ready to go. Have list of files to compact.
+ StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
+ // Move the compaction into place.
+ StoreFile sf = completeCompaction(filesToCompact, writer);
+ }
+ }
+
+ /*
* @param files
* @return True if any of the files in <code>files</code> are References.
*/
@@ -855,13 +877,12 @@ public class Store implements HeapSize {
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
try {
- // NOTE: the majority of the time for a compaction is spent in this section
- if (majorCompaction) {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
- scanner = new StoreScanner(this, scan, scanners);
+ /* include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@@ -899,39 +920,6 @@ public class Store implements HeapSize {
scanner.close();
}
}
- } else {
- MinorCompactingStoreScanner scanner = null;
- try {
- scanner = new MinorCompactingStoreScanner(this, scanners);
- if (scanner.peek() != null) {
- writer = createWriterInTmp(maxKeyCount);
- int bytesWritten = 0;
- while (scanner.peek() != null) {
- KeyValue kv = scanner.next();
- writer.append(kv);
-
- // check periodically to see if a system stop is requested
- if (Store.closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > Store.closeCheckInterval) {
- bytesWritten = 0;
- if (!this.region.areWritesEnabled()) {
- writer.close();
- fs.delete(writer.getPath(), false);
- throw new InterruptedIOException(
- "Aborting compaction of store " + this +
- " in region " + this.region +
- " because user requested stop.");
- }
- }
- }
- }
- }
- } finally {
- if (scanner != null)
- scanner.close();
- }
- }
} finally {
if (writer != null) {
writer.appendMetadata(maxId, majorCompaction);
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 02:07:54 2011
@@ -57,12 +57,14 @@ class StoreScanner implements KeyValueSc
* @param columns which columns we are scanning
* @throws IOException
*/
- StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) throws IOException {
+ StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
+ throws IOException {
this.store = store;
this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
- store.versionsToReturn(scan.getMaxVersions()));
+ store.versionsToReturn(scan.getMaxVersions()),
+ false);
this.isGet = scan.isGetScan();
// pass columns = try to filter out unnecessary ScanFiles
@@ -88,14 +90,16 @@ class StoreScanner implements KeyValueSc
* @param scan the spec
* @param scanners ancilliary scanners
*/
- StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners)
+ StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
+ boolean retainDeletesInOutput)
throws IOException {
this.store = store;
this.cacheBlocks = false;
this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
- store.versionsToReturn(scan.getMaxVersions()));
+ store.versionsToReturn(scan.getMaxVersions()),
+ retainDeletesInOutput);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -116,7 +120,8 @@ class StoreScanner implements KeyValueSc
this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
- comparator.getRawComparator(), scan.getMaxVersions());
+ comparator.getRawComparator(), scan.getMaxVersions(),
+ false);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 11 02:07:54 2011
@@ -58,18 +58,31 @@ public class TestCompaction extends HBas
private static final byte [] COLUMN_FAMILY = fam1;
private final byte [] STARTROW = Bytes.toBytes(START_KEY);
private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
- private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
+ private int compactionThreshold;
+ private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
+ final private byte[] col1, col2;
private MiniDFSCluster cluster;
/** constructor */
- public TestCompaction() {
+ public TestCompaction() throws Exception {
super();
// Set cache flush size to 1MB
conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
conf.setInt("hbase.hregion.memstore.block.multiplier", 10);
this.cluster = null;
+ compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+ firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+ secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+ // Increment the least significant character so we get to next row.
+ secondRowBytes[START_KEY_BYTES.length - 1]++;
+ thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+ thirdRowBytes[START_KEY_BYTES.length - 1]++;
+ thirdRowBytes[START_KEY_BYTES.length - 1]++;
+ col1 = "column1".getBytes(HConstants.UTF8_ENCODING);
+ col2 = "column2".getBytes(HConstants.UTF8_ENCODING);
}
@Override
@@ -102,7 +115,7 @@ public class TestCompaction extends HBas
*/
public void testMajorCompactingToNoOutput() throws IOException {
createStoreFile(r);
- for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
@@ -133,43 +146,35 @@ public class TestCompaction extends HBas
* Assert deletes get cleaned up.
* @throws Exception
*/
- public void testCompaction() throws Exception {
+ public void testMajorCompaction() throws Exception {
createStoreFile(r);
- for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
- // Add more content. Now there are about 5 versions of each column.
- // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
- // Assert == 3 when we ask for versions.
+ // Add more content.
addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
-
- // FIX!!
-// Cell[] cellValues =
-// Cell.createSingleCellArray(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+ // Now there are about 5 versions of each column.
+ // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
+ //
+ // Assert == 3 when we ask for versions.
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
- // Assert that I can get 3 versions since it is the max I should get
- assertEquals(COMPACTION_THRESHOLD, result.size());
-// assertEquals(cellValues.length, 3);
r.flushcache();
- r.compactStores();
- // Always 3 versions if that is what max versions is.
- byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+ r.compactStores(true);
+
+ // look at the second row
// Increment the least significant character so we get to next row.
+ byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
secondRowBytes[START_KEY_BYTES.length - 1]++;
- // FIX
- result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
- // Assert that I can get 3 versions since it is the max I should get
- assertEquals(3, result.size());
-//
-// cellValues = Cell.createSingleCellArray(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/));
-// LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " +
-// cellValues.length);
-// assertTrue(cellValues.length == 3);
+ // Always 3 versions if that is what max versions is.
+ result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
- // Now add deletes to memstore and then flush it. That will put us over
+ // Now add deletes to memstore and then flush it.
+ // That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
@@ -179,51 +184,32 @@ public class TestCompaction extends HBas
r.delete(delete, null, true);
// Assert deleted.
-
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
- assertTrue(result.isEmpty());
-
+ assertTrue("Second row should have been deleted", result.isEmpty());
r.flushcache();
+
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
- assertTrue(result.isEmpty());
+ assertTrue("Second row should have been deleted", result.isEmpty());
// Add a bit of data and flush. Start adding at 'bbb'.
createSmallerStoreFile(this.r);
r.flushcache();
// Assert that the second row is still deleted.
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
- assertTrue(result.isEmpty());
+ assertTrue("Second row should still be deleted", result.isEmpty());
// Force major compaction.
r.compactStores(true);
assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
- assertTrue(result.isEmpty());
+ assertTrue("Second row should still be deleted", result.isEmpty());
// Make sure the store files do have some 'aaa' keys in them -- exactly 3.
// Also, that compacted store files do not have any secondRowBytes because
// they were deleted.
- int count = 0;
- boolean containsStartRow = false;
- for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
- HFileScanner scanner = f.getReader().getScanner(false, false);
- scanner.seekTo();
- do {
- byte [] row = scanner.getKeyValue().getRow();
- if (Bytes.equals(row, STARTROW)) {
- containsStartRow = true;
- count++;
- } else {
- // After major compaction, should be none of these rows in compacted
- // file.
- assertFalse(Bytes.equals(row, secondRowBytes));
- }
- } while(scanner.next());
- }
- assertTrue(containsStartRow);
- assertTrue(count == 3);
+ verifyCounts(3,0);
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
@@ -234,10 +220,142 @@ public class TestCompaction extends HBas
Thread.sleep(ttlInSeconds * 1000);
r.compactStores(true);
- count = count();
- assertTrue(count == 0);
+ int count = count();
+ assertTrue("Should not see anything after TTL has expired", count == 0);
+ }
+
+ public void testMinorCompactionWithDeleteRow() throws Exception {
+ Delete deleteRow = new Delete(secondRowBytes);
+ testMinorCompactionWithDelete(deleteRow);
}
+ public void testMinorCompactionWithDeleteColumn1() throws Exception {
+ Delete dc = new Delete(secondRowBytes);
+ /* delete all timestamps in the column */
+ dc.deleteColumns(fam2, col2);
+ testMinorCompactionWithDelete(dc);
+ }
+ public void testMinorCompactionWithDeleteColumn2() throws Exception {
+ Delete dc = new Delete(secondRowBytes);
+ dc.deleteColumn(fam2, col2);
+ /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
+ * we only delete the latest version. One might expect to see only
+ * versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
+ * This is okay as well. Since there was no compaction done before the
+ * delete, version 0 seems to stay on.
+ */
+ //testMinorCompactionWithDelete(dc, 2);
+ testMinorCompactionWithDelete(dc, 3);
+ }
+ public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
+ Delete deleteCF = new Delete(secondRowBytes);
+ deleteCF.deleteFamily(fam2);
+ testMinorCompactionWithDelete(deleteCF);
+ }
+ public void testMinorCompactionWithDeleteVersion1() throws Exception {
+ Delete deleteVersion = new Delete(secondRowBytes);
+ deleteVersion.deleteColumns(fam2, col2, 2);
+ /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
+ * We delete versions 0 ... 2. So, we still have one remaining.
+ */
+ testMinorCompactionWithDelete(deleteVersion, 1);
+ }
+ public void testMinorCompactionWithDeleteVersion2() throws Exception {
+ Delete deleteVersion = new Delete(secondRowBytes);
+ deleteVersion.deleteColumn(fam2, col2, 1);
+ /*
+ * the table has 4 versions: 0, 1, 2, and 3.
+ * 0 does not count.
+ * We delete 1.
+ * Should have 2 remaining.
+ */
+ testMinorCompactionWithDelete(deleteVersion, 2);
+ }
+
+ /*
+ * A helper function to test the minor compaction algorithm. We check that
+ * the delete markers are left behind. Takes delete as an argument, which
+ * can be any delete (row, column, columnfamliy etc), that essentially
+ * deletes row2 and column2. row1 and column1 should be undeleted
+ */
+ private void testMinorCompactionWithDelete(Delete delete) throws Exception {
+ testMinorCompactionWithDelete(delete, 0);
+ }
+ private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
+ HRegionIncommon loader = new HRegionIncommon(r);
+ for (int i = 0; i < compactionThreshold + 1; i++) {
+ addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
+ addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
+ addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
+ addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
+ r.flushcache();
+ }
+
+ Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
+ result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
+
+ // Now add deletes to memstore and then flush it. That will put us over
+ // the compaction threshold of 3 store files. Compacting these store files
+ // should result in a compacted store file that has no references to the
+ // deleted row.
+ r.delete(delete, null, true);
+
+ // Make sure that we have only deleted family2 from secondRowBytes
+ result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+ assertEquals(expectedResultsAfterDelete, result.size());
+ // but we still have firstrow
+ result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
+
+ r.flushcache();
+ // should not change anything.
+ // Let us check again
+ // Make sure that we have only deleted family2 from secondRowBytes
+ result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+ assertEquals(expectedResultsAfterDelete, result.size());
+ // but we still have firstrow
+ result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
+
+ // do a compaction
+ Store store2 = this.r.stores.get(fam2);
+ int numFiles1 = store2.getStorefiles().size();
+ assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
+ store2.compactRecent(compactionThreshold); // = 3
+ int numFiles2 = store2.getStorefiles().size();
+ // Check that we did compact
+ assertTrue("Number of store files should go down", numFiles1 > numFiles2);
+ // Check that it was a minor compaction.
+ assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
+
+ // Make sure that we have only deleted family2 from secondRowBytes
+ result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+ assertEquals(expectedResultsAfterDelete, result.size());
+ // but we still have firstrow
+ result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+ assertEquals(compactionThreshold, result.size());
+ }
+
+ private void verifyCounts(int countRow1, int countRow2) throws Exception {
+ int count1 = 0;
+ int count2 = 0;
+ for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
+ HFileScanner scanner = f.getReader().getScanner(false, false);
+ scanner.seekTo();
+ do {
+ byte [] row = scanner.getKeyValue().getRow();
+ if (Bytes.equals(row, STARTROW)) {
+ count1++;
+ } else if(Bytes.equals(row, secondRowBytes)) {
+ count2++;
+ }
+ } while(scanner.next());
+ }
+ assertEquals(countRow1,count1);
+ assertEquals(countRow2,count2);
+ }
/**
* Verify that you can stop a long-running compaction
@@ -253,9 +371,9 @@ public class TestCompaction extends HBas
try {
// Create a couple store files w/ 15KB (over 10KB interval)
- int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
+ int jmax = (int) Math.ceil(15.0/compactionThreshold);
byte [] pad = new byte[1000]; // 1 KB chunk
- for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ for (int i = 0; i < compactionThreshold; i++) {
HRegionIncommon loader = new HRegionIncommon(r);
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
for (int j = 0; j < jmax; j++) {
@@ -279,7 +397,7 @@ public class TestCompaction extends HBas
// ensure that the compaction stopped, all old files are intact,
Store s = r.stores.get(COLUMN_FAMILY);
- assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
+ assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
@@ -291,7 +409,7 @@ public class TestCompaction extends HBas
Store.closeCheckInterval = origWI;
// Delete all Store information once done using
- for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ for (int i = 0; i < compactionThreshold; i++) {
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.deleteFamily(famAndQf[0]);