You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:07:54 UTC

svn commit: r1181431 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 02:07:54 2011
New Revision: 1181431

URL: http://svn.apache.org/viewvc?rev=1181431&view=rev
Log:
Unify Major/Minor Compaction code

Summary:
Unify the code path for major/minor compaction.

Initial diff, with some tests added to testCompaction.

Test Plan:
Added a test to testCompaction. Need to extend further to test other forms of
deletes.

running all the tests to make sure this change doesn't break anything else.

DiffCamp Revision: 175162
Reviewed By: kannan
Commenters: jgray, nspiegelberg
CC: jgray, aaiyer, nspiegelberg, achao, kannan, kranganathan, aravind
Tasks:
#423541: DL tier: java.lang.IllegalArgumentException during compactions

Revert Plan:
ok

Removed:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Tue Oct 11 02:07:54 2011
@@ -45,6 +45,7 @@ public class ScanQueryMatcher {
 
   /** Keeps track of deletes */
   protected DeleteTracker deletes;
+  protected boolean retainDeletesInOutput;
 
   /** Keeps track of columns and versions */
   protected ColumnTracker columns;
@@ -71,7 +72,8 @@ public class ScanQueryMatcher {
    */
   public ScanQueryMatcher(Scan scan, byte [] family,
       NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int maxVersions) {
+      KeyValue.KeyComparator rowComparator, int maxVersions,
+      boolean retainDeletesInOutput) {
     this.tr = scan.getTimeRange();
     this.oldestStamp = System.currentTimeMillis() - ttl;
     this.rowComparator = rowComparator;
@@ -79,6 +81,7 @@ public class ScanQueryMatcher {
     this.stopRow = scan.getStopRow();
     this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
     this.filter = scan.getFilter();
+    this.retainDeletesInOutput = retainDeletesInOutput;
 
     // Single branch to deal with two types of reads (columns vs all in family)
     if (columns == null || columns.size() == 0) {
@@ -90,6 +93,13 @@ public class ScanQueryMatcher {
       this.columns = new ExplicitColumnTracker(columns,maxVersions);
     }
   }
+  public ScanQueryMatcher(Scan scan, byte [] family,
+      NavigableSet<byte[]> columns, long ttl,
+      KeyValue.KeyComparator rowComparator, int maxVersions) {
+      /* By default we will not include deletes */
+      /* deletes are included explicitly (for minor compaction) */
+      this(scan, family, columns, ttl, rowComparator, maxVersions, false);
+  }
 
   /**
    * Determines if the caller should do one of several things:
@@ -159,7 +169,12 @@ public class ScanQueryMatcher {
         this.deletes.add(bytes, offset, qualLength, timestamp, type);
         // Can't early out now, because DelFam come before any other keys
       }
-      return MatchCode.SKIP;
+      if (retainDeletesInOutput) {
+        return MatchCode.INCLUDE;
+      }
+      else {
+        return MatchCode.SKIP;
+      }
     }
 
     if (!this.deletes.isEmpty() &&
@@ -356,4 +371,4 @@ public class ScanQueryMatcher {
      */
     SEEK_NEXT_USING_HINT,
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:07:54 2011
@@ -738,6 +738,28 @@ public class Store implements HeapSize {
   }
 
   /*
+   * Compact the most recent N files. Essentially a hook for testing.
+   */
+  protected void compactRecent(int N) throws IOException {
+    synchronized(compactLock) {
+      List<StoreFile> filesToCompact = this.storefiles;
+      int count = filesToCompact.size();
+      if (N > count) {
+        throw new RuntimeException("Not enough files");
+      }
+
+      filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
+      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+      boolean majorcompaction = (N == count);
+
+      // Ready to go.  Have list of files to compact.
+      StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
+      // Move the compaction into place.
+      StoreFile sf = completeCompaction(filesToCompact, writer);
+    }
+  }
+
+  /*
    * @param files
    * @return True if any of the files in <code>files</code> are References.
    */
@@ -855,13 +877,12 @@ public class Store implements HeapSize {
     // where all source cells are expired or deleted.
     StoreFile.Writer writer = null;
     try {
-    // NOTE: the majority of the time for a compaction is spent in this section
-    if (majorCompaction) {
       InternalScanner scanner = null;
       try {
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
-        scanner = new StoreScanner(this, scan, scanners);
+        /* include deletes, unless we are doing a major compaction */
+        scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
         int bytesWritten = 0;
         // since scanner.next() can return 'false' but still be delivering data,
         // we have to use a do/while loop.
@@ -899,39 +920,6 @@ public class Store implements HeapSize {
           scanner.close();
         }
       }
-    } else {
-      MinorCompactingStoreScanner scanner = null;
-      try {
-        scanner = new MinorCompactingStoreScanner(this, scanners);
-        if (scanner.peek() != null) {
-          writer = createWriterInTmp(maxKeyCount);
-          int bytesWritten = 0;
-          while (scanner.peek() != null) {
-            KeyValue kv = scanner.next();
-            writer.append(kv);
-
-            // check periodically to see if a system stop is requested
-            if (Store.closeCheckInterval > 0) {
-              bytesWritten += kv.getLength();
-              if (bytesWritten > Store.closeCheckInterval) {
-                bytesWritten = 0;
-                if (!this.region.areWritesEnabled()) {
-                  writer.close();
-                  fs.delete(writer.getPath(), false);
-                  throw new InterruptedIOException(
-                      "Aborting compaction of store " + this +
-                      " in region " + this.region +
-                      " because user requested stop.");
-                }
-              }
-            }
-          }
-        }
-      } finally {
-        if (scanner != null)
-          scanner.close();
-      }
-    }
     } finally {
       if (writer != null) {
         writer.appendMetadata(maxId, majorCompaction);

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 02:07:54 2011
@@ -57,12 +57,14 @@ class StoreScanner implements KeyValueSc
    * @param columns which columns we are scanning
    * @throws IOException
    */
-  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) throws IOException {
+  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
+                              throws IOException {
     this.store = store;
     this.cacheBlocks = scan.getCacheBlocks();
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         columns, store.ttl, store.comparator.getRawComparator(),
-        store.versionsToReturn(scan.getMaxVersions()));
+        store.versionsToReturn(scan.getMaxVersions()),
+        false);
 
     this.isGet = scan.isGetScan();
     // pass columns = try to filter out unnecessary ScanFiles
@@ -88,14 +90,16 @@ class StoreScanner implements KeyValueSc
    * @param scan the spec
    * @param scanners ancilliary scanners
    */
-  StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners)
+  StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
+                            boolean retainDeletesInOutput)
       throws IOException {
     this.store = store;
     this.cacheBlocks = false;
     this.isGet = false;
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         null, store.ttl, store.comparator.getRawComparator(),
-        store.versionsToReturn(scan.getMaxVersions()));
+        store.versionsToReturn(scan.getMaxVersions()),
+        retainDeletesInOutput);
 
     // Seek all scanners to the initial key
     for(KeyValueScanner scanner : scanners) {
@@ -116,7 +120,8 @@ class StoreScanner implements KeyValueSc
     this.isGet = false;
     this.cacheBlocks = scan.getCacheBlocks();
     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
-        comparator.getRawComparator(), scan.getMaxVersions());
+        comparator.getRawComparator(), scan.getMaxVersions(),
+        false);
 
     // Seek all scanners to the initial key
     for(KeyValueScanner scanner : scanners) {

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1181431&r1=1181430&r2=1181431&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 11 02:07:54 2011
@@ -58,18 +58,31 @@ public class TestCompaction extends HBas
   private static final byte [] COLUMN_FAMILY = fam1;
   private final byte [] STARTROW = Bytes.toBytes(START_KEY);
   private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
-  private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
+  private int compactionThreshold;
+  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
+  final private byte[] col1, col2;
 
   private MiniDFSCluster cluster;
 
   /** constructor */
-  public TestCompaction() {
+  public TestCompaction() throws Exception {
     super();
 
     // Set cache flush size to 1MB
     conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
     conf.setInt("hbase.hregion.memstore.block.multiplier", 10);
     this.cluster = null;
+    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+    firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+    secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+    // Increment the least significant character so we get to next row.
+    secondRowBytes[START_KEY_BYTES.length - 1]++;
+    thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+    thirdRowBytes[START_KEY_BYTES.length - 1]++;
+    thirdRowBytes[START_KEY_BYTES.length - 1]++;
+    col1 = "column1".getBytes(HConstants.UTF8_ENCODING);
+    col2 = "column2".getBytes(HConstants.UTF8_ENCODING);
   }
 
   @Override
@@ -102,7 +115,7 @@ public class TestCompaction extends HBas
    */
   public void testMajorCompactingToNoOutput() throws IOException {
     createStoreFile(r);
-    for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+    for (int i = 0; i < compactionThreshold; i++) {
       createStoreFile(r);
     }
     // Now delete everything.
@@ -133,43 +146,35 @@ public class TestCompaction extends HBas
    * Assert deletes get cleaned up.
    * @throws Exception
    */
-  public void testCompaction() throws Exception {
+  public void testMajorCompaction() throws Exception {
     createStoreFile(r);
-    for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+    for (int i = 0; i < compactionThreshold; i++) {
       createStoreFile(r);
     }
-    // Add more content.  Now there are about 5 versions of each column.
-    // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
-    // Assert == 3 when we ask for versions.
+    // Add more content.
     addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
 
-
-    // FIX!!
-//    Cell[] cellValues =
-//      Cell.createSingleCellArray(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+    // Now there are about 5 versions of each column.
+    // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
+    //
+    // Assert == 3 when we ask for versions.
     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
 
-    // Assert that I can get 3 versions since it is the max I should get
-    assertEquals(COMPACTION_THRESHOLD, result.size());
-//    assertEquals(cellValues.length, 3);
     r.flushcache();
-    r.compactStores();
-    // Always 3 versions if that is what max versions is.
-    byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
+    r.compactStores(true);
+
+    // look at the second row
     // Increment the least significant character so we get to next row.
+    byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
     secondRowBytes[START_KEY_BYTES.length - 1]++;
-    // FIX
-    result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
 
-    // Assert that I can get 3 versions since it is the max I should get
-    assertEquals(3, result.size());
-//
-//    cellValues = Cell.createSingleCellArray(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/));
-//    LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " +
-//      cellValues.length);
-//    assertTrue(cellValues.length == 3);
+    // Always 3 versions if that is what max versions is.
+    result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
 
-    // Now add deletes to memstore and then flush it.  That will put us over
+    // Now add deletes to memstore and then flush it.
+    // That will put us over
     // the compaction threshold of 3 store files.  Compacting these store files
     // should result in a compacted store file that has no references to the
     // deleted row.
@@ -179,51 +184,32 @@ public class TestCompaction extends HBas
     r.delete(delete, null, true);
 
     // Assert deleted.
-
     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
-    assertTrue(result.isEmpty());
-
+    assertTrue("Second row should have been deleted", result.isEmpty());
 
     r.flushcache();
+
     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
-    assertTrue(result.isEmpty());
+    assertTrue("Second row should have been deleted", result.isEmpty());
 
     // Add a bit of data and flush.  Start adding at 'bbb'.
     createSmallerStoreFile(this.r);
     r.flushcache();
     // Assert that the second row is still deleted.
     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
-    assertTrue(result.isEmpty());
+    assertTrue("Second row should still be deleted", result.isEmpty());
 
     // Force major compaction.
     r.compactStores(true);
     assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
 
     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
-    assertTrue(result.isEmpty());
+    assertTrue("Second row should still be deleted", result.isEmpty());
 
     // Make sure the store files do have some 'aaa' keys in them -- exactly 3.
     // Also, that compacted store files do not have any secondRowBytes because
     // they were deleted.
-    int count = 0;
-    boolean containsStartRow = false;
-    for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
-      HFileScanner scanner = f.getReader().getScanner(false, false);
-      scanner.seekTo();
-      do {
-        byte [] row = scanner.getKeyValue().getRow();
-        if (Bytes.equals(row, STARTROW)) {
-          containsStartRow = true;
-          count++;
-        } else {
-          // After major compaction, should be none of these rows in compacted
-          // file.
-          assertFalse(Bytes.equals(row, secondRowBytes));
-        }
-      } while(scanner.next());
-    }
-    assertTrue(containsStartRow);
-    assertTrue(count == 3);
+    verifyCounts(3,0);
 
     // Multiple versions allowed for an entry, so the delete isn't enough
     // Lower TTL and expire to ensure that all our entries have been wiped
@@ -234,10 +220,142 @@ public class TestCompaction extends HBas
     Thread.sleep(ttlInSeconds * 1000);
 
     r.compactStores(true);
-    count = count();
-    assertTrue(count == 0);
+    int count = count();
+    assertTrue("Should not see anything after TTL has expired", count == 0);
+  }
+
+  public void testMinorCompactionWithDeleteRow() throws Exception {
+    Delete deleteRow = new Delete(secondRowBytes);
+    testMinorCompactionWithDelete(deleteRow);
   }
+  public void testMinorCompactionWithDeleteColumn1() throws Exception {
+    Delete dc = new Delete(secondRowBytes);
+    /* delete all timestamps in the column */
+    dc.deleteColumns(fam2, col2);
+    testMinorCompactionWithDelete(dc);
+  }
+  public void testMinorCompactionWithDeleteColumn2() throws Exception {
+    Delete dc = new Delete(secondRowBytes);
+    dc.deleteColumn(fam2, col2);
+    /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
+     * we only delete the latest version. One might expect to see only
+     * versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
+     * This is okay as well. Since there was no compaction done before the
+     * delete, version 0 seems to stay on.
+     */
+    //testMinorCompactionWithDelete(dc, 2);
+    testMinorCompactionWithDelete(dc, 3);
+  }
+  public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
+    Delete deleteCF = new Delete(secondRowBytes);
+    deleteCF.deleteFamily(fam2);
+    testMinorCompactionWithDelete(deleteCF);
+  }
+  public void testMinorCompactionWithDeleteVersion1() throws Exception {
+    Delete deleteVersion = new Delete(secondRowBytes);
+    deleteVersion.deleteColumns(fam2, col2, 2);
+    /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
+     * We delete versions 0 ... 2. So, we still have one remaining.
+     */
+    testMinorCompactionWithDelete(deleteVersion, 1);
+  }
+  public void testMinorCompactionWithDeleteVersion2() throws Exception {
+    Delete deleteVersion = new Delete(secondRowBytes);
+    deleteVersion.deleteColumn(fam2, col2, 1);
+    /*
+     * the table has 4 versions: 0, 1, 2, and 3.
+     * 0 does not count.
+     * We delete 1.
+     * Should have 2 remaining.
+     */
+    testMinorCompactionWithDelete(deleteVersion, 2);
+  }
+
+  /*
+   * A helper function to test the minor compaction algorithm. We check that
+   * the delete markers are left behind. Takes delete as an argument, which
+   * can be any delete (row, column, columnfamliy etc), that essentially
+   * deletes row2 and column2. row1 and column1 should be undeleted
+   */
+  private void testMinorCompactionWithDelete(Delete delete) throws Exception {
+    testMinorCompactionWithDelete(delete, 0);
+  }
+  private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
+    HRegionIncommon loader = new HRegionIncommon(r);
+    for (int i = 0; i < compactionThreshold + 1; i++) {
+      addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
+      addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
+      addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
+      addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
+      r.flushcache();
+    }
+
+    Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
+    result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
+
+    // Now add deletes to memstore and then flush it.  That will put us over
+    // the compaction threshold of 3 store files.  Compacting these store files
+    // should result in a compacted store file that has no references to the
+    // deleted row.
+    r.delete(delete, null, true);
+
+    // Make sure that we have only deleted family2 from secondRowBytes
+    result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+    assertEquals(expectedResultsAfterDelete, result.size());
+    // but we still have firstrow
+    result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
+
+    r.flushcache();
+    // should not change anything.
+    // Let us check again
 
+    // Make sure that we have only deleted family2 from secondRowBytes
+    result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+    assertEquals(expectedResultsAfterDelete, result.size());
+    // but we still have firstrow
+    result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
+
+    // do a compaction
+    Store store2 = this.r.stores.get(fam2);
+    int numFiles1 = store2.getStorefiles().size();
+    assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
+    store2.compactRecent(compactionThreshold);   // = 3
+    int numFiles2 = store2.getStorefiles().size();
+    // Check that we did compact
+    assertTrue("Number of store files should go down", numFiles1 > numFiles2);
+    // Check that it was a minor compaction.
+    assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
+
+    // Make sure that we have only deleted family2 from secondRowBytes
+    result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
+    assertEquals(expectedResultsAfterDelete, result.size());
+    // but we still have firstrow
+    result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
+    assertEquals(compactionThreshold, result.size());
+  }
+
+  private void verifyCounts(int countRow1, int countRow2) throws Exception {
+    int count1 = 0;
+    int count2 = 0;
+    for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
+      HFileScanner scanner = f.getReader().getScanner(false, false);
+      scanner.seekTo();
+      do {
+        byte [] row = scanner.getKeyValue().getRow();
+        if (Bytes.equals(row, STARTROW)) {
+          count1++;
+        } else if(Bytes.equals(row, secondRowBytes)) {
+          count2++;
+        }
+      } while(scanner.next());
+    }
+    assertEquals(countRow1,count1);
+    assertEquals(countRow2,count2);
+  }
 
   /**
    * Verify that you can stop a long-running compaction
@@ -253,9 +371,9 @@ public class TestCompaction extends HBas
 
     try {
       // Create a couple store files w/ 15KB (over 10KB interval)
-      int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
+      int jmax = (int) Math.ceil(15.0/compactionThreshold);
       byte [] pad = new byte[1000]; // 1 KB chunk
-      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+      for (int i = 0; i < compactionThreshold; i++) {
         HRegionIncommon loader = new HRegionIncommon(r);
         Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
         for (int j = 0; j < jmax; j++) {
@@ -279,7 +397,7 @@ public class TestCompaction extends HBas
 
       // ensure that the compaction stopped, all old files are intact,
       Store s = r.stores.get(COLUMN_FAMILY);
-      assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
+      assertEquals(compactionThreshold, s.getStorefilesCount());
       assertTrue(s.getStorefilesSize() > 15*1000);
       // and no new store files persisted past compactStores()
       FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
@@ -291,7 +409,7 @@ public class TestCompaction extends HBas
       Store.closeCheckInterval = origWI;
 
       // Delete all Store information once done using
-      for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+      for (int i = 0; i < compactionThreshold; i++) {
         Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
         byte [][] famAndQf = {COLUMN_FAMILY, null};
         delete.deleteFamily(famAndQf[0]);