You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:09:42 UTC

svn commit: r1181454 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/Store.java main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java

Author: nspiegelberg
Date: Tue Oct 11 02:09:42 2011
New Revision: 1181454

URL: http://svn.apache.org/viewvc?rev=1181454&view=rev
Log:
Max Compaction Size

Summary:
Add ability to specify a maximum storefile size for compaction.
After this limit, we will not include this file in compactions. This is
useful for large object stores and clusters that pre-split regions.
Also, use this opportunity to write unit tests for the existing
compaction algorithm.

Test Plan:
- mvn test -Dtest=TestHeapSize
 - mvn test -Dtest=TestCompactSelection
 - mvn test -Dtest=TestCompaction

DiffCamp Revision: 187419
Reviewed By: jgray
Commenters: kannan
CC: jgray, nspiegelberg, achao, kannan
Revert Plan:
OK

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181454&r1=1181453&r2=1181454&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:09:42 2011
@@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
@@ -93,8 +95,10 @@ public class Store implements HeapSize {
   // ttl in milliseconds.
   protected long ttl;
   private long majorCompactionTime;
+  private final int minFilesToCompact;
   private final int maxFilesToCompact;
   private final long minCompactSize;
+  private final long maxCompactSize;
   // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
   // With float, java will downcast your long to float for comparisons (bad)
   private double compactRatio;
@@ -121,7 +125,6 @@ public class Store implements HeapSize {
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
   private final Object compactLock = new Object();
-  private final int compactionThreshold;
   private final int blocksize;
   private final boolean blockcache;
   private final Compression.Algorithm compression;
@@ -171,10 +174,10 @@ public class Store implements HeapSize {
     this.memstore = new MemStore(this.comparator);
     this.storeNameStr = Bytes.toString(this.family.getName());
 
-    // By default, we compact if an HStore has more than
-    // MIN_COMMITS_FOR_COMPACTION map files
-    this.compactionThreshold = Math.max(2,
-      conf.getInt("hbase.hstore.compactionThreshold", 3));
+    // By default, compact if storefile.count >= minFilesToCompact
+    this.minFilesToCompact = Math.max(2,
+      conf.getInt("hbase.hstore.compaction.min",
+        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
 
     // Check if this is in-memory store
     this.inMemory = family.isInMemory();
@@ -192,7 +195,10 @@ public class Store implements HeapSize {
     this.majorCompactionTime = getNextMajorCompactTime();
 
     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
-    this.minCompactSize = this.region.memstoreFlushSize * 3 / 2; // +50% pad
+    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
+      this.region.memstoreFlushSize);
+    this.maxCompactSize
+      = conf.getLong("hbase.hstore.compaction.max.size", 0);
     this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
 
     if (Store.closeCheckInterval == 0) {
@@ -531,7 +537,7 @@ public class Store implements HeapSize {
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
 
-      return this.storefiles.size() >= this.compactionThreshold;
+      return this.storefiles.size() >= this.minFilesToCompact;
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -589,124 +595,44 @@ public class Store implements HeapSize {
    */
   StoreSize compact(final boolean forceMajor) throws IOException {
     boolean forceSplit = this.region.shouldSplit(false);
-    boolean majorcompaction = forceMajor;
     synchronized (compactLock) {
-      this.lastCompactSize = 0;
+      this.lastCompactSize = 0; // reset first in case compaction is aborted
 
-      // filesToCompact are sorted oldest to newest.
-      List<StoreFile> filesToCompact = this.storefiles;
-      if (filesToCompact.isEmpty()) {
-        LOG.debug(this.storeNameStr + ": no store files to compact");
-        return null;
-      }
-
-      // Check to see if we need to do a major compaction on this region.
-      // If so, change doMajorCompaction to true to skip the incremental
-      // compacting below. Only check if doMajorCompaction is not true.
-      if (!majorcompaction) {
-        majorcompaction = isMajorCompaction(filesToCompact);
+      // sanity checks
+      for (StoreFile sf : this.storefiles) {
+        if (sf.getPath() == null || sf.getReader() == null) {
+          boolean np = sf.getPath() == null;
+          LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
+          return null;
+        }
       }
 
-      boolean references = hasReferences(filesToCompact);
-      if (!majorcompaction && !references &&
-          (forceSplit || (filesToCompact.size() < compactionThreshold))) {
+      // if the user wants to force a split, skip compaction unless necessary
+      boolean references = hasReferences(this.storefiles);
+      if (forceSplit && !forceMajor && !references) {
         return checkSplit(forceSplit);
       }
 
-      /* get store file sizes for incremental compacting selection.
-       * normal skew:
-       *
-       *         older ----> newer
-       *     _
-       *    | |   _
-       *    | |  | |   _
-       *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-       *    | |  | |  | |  | |  _  | |
-       *    | |  | |  | |  | | | | | |
-       *    | |  | |  | |  | | | | | |
-       */
-      int countOfFiles = filesToCompact.size();
-      long [] fileSizes = new long[countOfFiles];
-      long [] sumSize = new long[countOfFiles];
-      for (int i = countOfFiles-1; i >= 0; --i) {
-        StoreFile file = filesToCompact.get(i);
-        Path path = file.getPath();
-        if (path == null) {
-          LOG.error("Path is null for " + file);
-          return null;
-        }
-        StoreFile.Reader r = file.getReader();
-        if (r == null) {
-          LOG.error("StoreFile " + file + " has a null Reader");
-          return null;
-        }
-        fileSizes[i] = file.getReader().length();
-        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-        int tooFar = i + this.maxFilesToCompact - 1;
-        sumSize[i] = fileSizes[i]
-                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
-                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-      }
+      Collection<StoreFile> filesToCompact
+        = compactSelection(this.storefiles, forceMajor);
 
-      // never run major compaction if we have too many files to avoid OOM
-      if (countOfFiles > this.maxFilesToCompact) {
-        majorcompaction = false;
+      // empty == do not compact
+      if (filesToCompact.isEmpty()) {
+        // but do see if we need to split before returning
+        return checkSplit(forceSplit);
       }
 
+      // sum size of all files included in compaction
       long totalSize = 0;
-      if (!majorcompaction && !references) {
-        // we're doing a minor compaction, let's see what files are applicable
-        int start = 0;
-        double r = this.compactRatio;
-
-        /* Start at the oldest file and stop when you find the first file that
-         * meets compaction criteria:
-         *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
-         *      OR
-         *   (2) within the compactRatio of sum(newer_files)
-         * Given normal skew, any newer files will also meet this criteria
-         *
-         * Additional Note:
-         * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-         * compact().  Consider the oldest files first to avoid a
-         * situation where we always compact [end-threshold,end).  Then, the
-         * last file becomes an aggregate of the previous compactions.
-         */
-        while(countOfFiles - start >= this.compactionThreshold &&
-              fileSizes[start] >
-                Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
-          ++start;
-        }
-        int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
-        totalSize = fileSizes[start]
-                  + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
-
-        // if we don't have enough files to compact, just wait
-        if (end - start < this.compactionThreshold) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipped compaction of " + this.storeNameStr
-              + ".  Only " + (end - start) + " file(s) of size "
-              + StringUtils.humanReadableInt(totalSize)
-              + " are meet compaction criteria.");
-          }
-          return checkSplit(forceSplit);
-        }
-
-        if (0 == start && end == countOfFiles) {
-          // we decided all the files were candidates! major compact
-          majorcompaction = true;
-        } else {
-          filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
-            end));
-        }
-      } else {
-        // all files included in this compaction
-        for (long i : fileSizes) {
-          totalSize += i;
-        }
+      for (StoreFile sf : filesToCompact) {
+        totalSize += sf.getReader().length();
       }
       this.lastCompactSize = totalSize;
 
+      // major compaction iff all StoreFiles are included
+      boolean majorcompaction
+        = (filesToCompact.size() == this.storefiles.size());
+
       // Max-sequenceID is the last key in the files we're compacting
       long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
 
@@ -716,7 +642,8 @@ public class Store implements HeapSize {
         (references? ", hasReferences=true,": " ") + " into " +
           region.getTmpDir() + ", seqid=" + maxId +
           ", totalSize=" + StringUtils.humanReadableInt(totalSize));
-      StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
+      StoreFile.Writer writer
+        = compactStores(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
       if (LOG.isInfoEnabled()) {
@@ -747,7 +674,8 @@ public class Store implements HeapSize {
       boolean majorcompaction = (N == count);
 
       // Ready to go.  Have list of files to compact.
-      StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
+      StoreFile.Writer writer
+        = compactStores(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
     }
@@ -856,7 +784,131 @@ public class Store implements HeapSize {
   }
 
   /**
-   * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
+   * Algorithm to choose which files to compact
+   *
+   * Configuration knobs:
+   *  "hbase.hstore.compaction.ratio"
+   *    normal case: minor compact when file <= sum(smaller_files) * ratio
+   *  "hbase.hstore.compaction.min.size"
+   *    unconditionally compact individual files below this size
+   *  "hbase.hstore.compaction.max.size"
+   *    never compact individual files above this size (unless splitting)
+   *  "hbase.hstore.compaction.min"
+   *    min files needed to minor compact
+   *  "hbase.hstore.compaction.max"
+   *    max files to compact at once (avoids OOM)
+   *
+   * @param candidates candidate files, ordered from oldest to newest
+   * @param majorcompaction whether to force a major compaction
+   * @return subset copy of candidate list that meets compaction criteria
+   * @throws IOException
+   */
+  List<StoreFile> compactSelection(List<StoreFile> candidates,
+      boolean forcemajor) throws IOException {
+    /* normal skew:
+     *
+     *         older ----> newer
+     *     _
+     *    | |   _
+     *    | |  | |   _
+     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+     *    | |  | |  | |  | |  _  | |
+     *    | |  | |  | |  | | | | | |
+     *    | |  | |  | |  | | | | | |
+     */
+    List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
+
+    // do not compact files above a configurable max filesize
+    // save all references. we MUST compact them
+    if (this.maxCompactSize > 0) {
+      final long msize = this.maxCompactSize;
+      filesToCompact.removeAll(Collections2.filter(filesToCompact,
+        new Predicate<StoreFile>() {
+          public boolean apply(StoreFile sf) {
+            // NOTE: keep all references. we must compact them
+            return sf.getReader().length() > msize && !sf.isReference();
+          }
+        }));
+    }
+
+    // major compact on user action or age (caveat: we have too many files)
+    boolean majorcompaction = forcemajor ||
+      (!filesToCompact.isEmpty() && isMajorCompaction(filesToCompact)
+       && filesToCompact.size() > this.maxFilesToCompact);
+
+    if (filesToCompact.isEmpty()) {
+      LOG.debug(this.storeNameStr + ": no store files to compact");
+      return filesToCompact;
+    }
+
+    if (!majorcompaction && !hasReferences(filesToCompact)) {
+      // we're doing a minor compaction, let's see what files are applicable
+      int start = 0;
+      double r = this.compactRatio;
+
+      //sort files by size to correct when normal skew is altered by bulk load
+      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
+
+      // get store file sizes for incremental compacting selection.
+      int countOfFiles = filesToCompact.size();
+      long [] fileSizes = new long[countOfFiles];
+      long [] sumSize = new long[countOfFiles];
+      for (int i = countOfFiles-1; i >= 0; --i) {
+        StoreFile file = filesToCompact.get(i);
+        fileSizes[i] = file.getReader().length();
+        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+        int tooFar = i + this.maxFilesToCompact - 1;
+        sumSize[i] = fileSizes[i]
+                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
+                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+      }
+
+      /* Start at the oldest file and stop when you find the first file that
+       * meets compaction criteria:
+       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
+       *      OR
+       *   (2) within the compactRatio of sum(newer_files)
+       * Given normal skew, any newer files will also meet this criteria
+       *
+       * Additional Note:
+       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+       * compact().  Consider the oldest files first to avoid a
+       * situation where we always compact [end-threshold,end).  Then, the
+       * last file becomes an aggregate of the previous compactions.
+       */
+      while(countOfFiles - start >= this.minFilesToCompact &&
+            fileSizes[start] >
+              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
+        ++start;
+      }
+      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
+      long totalSize = fileSizes[start]
+                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
+      filesToCompact = filesToCompact.subList(start, end);
+
+      // if we don't have enough files to compact, just wait
+      if (filesToCompact.size() < this.minFilesToCompact) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipped compaction of " + this.storeNameStr
+            + ".  Only " + (end - start) + " file(s) of size "
+            + StringUtils.humanReadableInt(totalSize)
+            + " have met compaction criteria.");
+        }
+        return Collections.emptyList();
+      }
+    } else {
+      // all files included in this compaction, up to max
+      if (filesToCompact.size() > this.maxFilesToCompact) {
+        int pastMax = filesToCompact.size() - this.maxFilesToCompact;
+        filesToCompact.subList(0, pastMax).clear();
+      }
+    }
+    return filesToCompact;
+  }
+
+  /**
+   * Do a minor/major compaction on an explicit set of storefiles.
+   * Uses the scan infrastructure to make it easy.
    *
    * @param filesToCompact which files to compact
    * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
@@ -865,7 +917,7 @@ public class Store implements HeapSize {
    * nothing made it through the compaction.
    * @throws IOException
    */
-  private StoreFile.Writer compact(final List<StoreFile> filesToCompact,
+  private StoreFile.Writer compactStores(final Collection<StoreFile> filesToCompact,
                                final boolean majorCompaction, final long maxId)
       throws IOException {
     // calculate maximum key count after compaction (for blooms)
@@ -968,7 +1020,7 @@ public class Store implements HeapSize {
    * @return StoreFile created. May be null.
    * @throws IOException
    */
-  private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
+  private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
                                        final StoreFile.Writer compactedFile)
       throws IOException {
     // 1. Moving the new files into place -- if there is a new file (may not
@@ -1478,15 +1530,15 @@ public class Store implements HeapSize {
   /**
    * See if there's too much store files in this store
    * @return true if number of store files is greater than
-   *  the number defined in compactionThreshold
+   *  the number defined in minFilesToCompact
    */
   public boolean hasTooManyStoreFiles() {
-    return this.storefiles.size() > this.compactionThreshold;
+    return this.storefiles.size() > this.minFilesToCompact;
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (14 * ClassSize.REFERENCE) + (1 * Bytes.SIZEOF_DOUBLE) +
-      (6 * Bytes.SIZEOF_LONG) + (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
+      (7 * Bytes.SIZEOF_LONG) + (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181454&r1=1181453&r2=1181454&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 02:09:42 2011
@@ -54,6 +54,7 @@ import java.lang.management.MemoryUsage;
 import java.nio.ByteBuffer;
 import java.text.NumberFormat;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -303,7 +304,7 @@ public class StoreFile {
    * @return 0 if no non-bulk-load files are provided or, this is Store that
    * does not yet have any store files.
    */
-  public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
+  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
     long max = 0;
     for (StoreFile sf : sfs) {
       if (!sf.isBulkLoadResult()) {
@@ -902,6 +903,13 @@ public class StoreFile {
       bloomFilterType = BloomType.NONE;
     }
 
+    /**
+     * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
+     */
+    Reader() {
+      this.reader = null;
+    }
+
     public RawComparator<byte []> getComparator() {
       return reader.getComparator();
     }
@@ -1125,5 +1133,15 @@ public class StoreFile {
       }
     }
 
+    /**
+     * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size)
+     */
+    static final Comparator<StoreFile> FILE_SIZE =
+      Ordering.natural().reverse().onResultOf(new Function<StoreFile, Long>() {
+        @Override
+        public Long apply(StoreFile sf) {
+          return sf.getReader().length();
+        }
+      });
   }
 }

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java?rev=1181454&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java Tue Oct 11 02:09:42 2011
@@ -0,0 +1,208 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.TestWALReplay;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+
+public class TestCompactSelection extends TestCase {
+  private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private Configuration conf;
+  private Store store;
+  private static final String DIR
+    = HBaseTestingUtility.getTestDir() + "/TestCompactSelection/";
+
+  private static final int minFiles = 3;
+  private static final int maxFiles = 5;
+
+  private static final long minSize = 10;
+  private static final long maxSize = 1000;
+
+
+  @Override
+  public void setUp() throws Exception {
+    // setup config values necessary for store
+    this.conf = TEST_UTIL.getConfiguration();
+    this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
+    this.conf.setInt("hbase.hstore.compaction.min", minFiles);
+    this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
+    this.conf.setLong("hbase.hregion.memstore.flush.size", minSize);
+    this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
+    this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
+
+    //Setting up a Store
+    Path basedir = new Path(DIR);
+    Path logdir = new Path(DIR+"/logs");
+    Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
+    HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(logdir, true);
+
+    HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table"));
+    htd.addFamily(hcd);
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null);
+    HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
+
+    store = new Store(basedir, region, hcd, fs, conf);
+  }
+
+  // used so our tests don't deal with actual StoreFiles
+  static class MockStoreFile extends StoreFile {
+    long length = 0;
+    boolean isRef = false;
+
+    MockStoreFile(long length, boolean isRef) throws IOException {
+      super(TEST_UTIL.getTestFileSystem(), new Path("_"), false,
+            TEST_UTIL.getConfiguration(), BloomType.NONE, false);
+      this.length = length;
+      this.isRef  = isRef;
+    }
+
+    void setLength(long newLen) {
+      this.length = newLen;
+    }
+
+    @Override
+    boolean isMajorCompaction() {
+      return false;
+    }
+
+    @Override
+    boolean isReference() {
+      return this.isRef;
+    }
+
+    @Override
+    public StoreFile.Reader getReader() {
+      final long len = this.length;
+      return new StoreFile.Reader() {
+        @Override
+        public long length() {
+          return len;
+        }
+      };
+    }
+  }
+
+  List<StoreFile> sfCreate(long ... sizes) throws IOException {
+    return sfCreate(false, sizes);
+  }
+
+  List<StoreFile> sfCreate(boolean isReference, long ... sizes)
+  throws IOException {
+    List<StoreFile> ret = Lists.newArrayList();
+    for (long i : sizes) {
+      ret.add(new MockStoreFile(i, isReference));
+    }
+    return ret;
+  }
+
+  void compactEquals(List<StoreFile> actual, long ... expected)
+  throws IOException {
+    compactEquals(actual, false, expected);
+  }
+
+  void compactEquals(List<StoreFile> actual, boolean forcemajor,
+      long ... expected)
+  throws IOException {
+    List<StoreFile> result = store.compactSelection(actual, forcemajor);
+    long[] aNums = new long[result.size()];
+    for (int i=0; i <result.size(); ++i) {
+      aNums[i] = result.get(i).getReader().length();
+    }
+    assertEquals(Arrays.toString(expected), Arrays.toString(aNums));
+  }
+
+  public void testCompactionRatio() throws IOException {
+    /*
+     * NOTE: these tests are specific to describe the implementation of the
+     * current compaction algorithm.  Developed to ensure that refactoring
+     * doesn't implicitly alter this.
+     */
+    long tooBig = maxSize + 1;
+
+    // default case. preserve user ratio on size
+    compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
+    // less than compact threshold = don't compact
+    compactEquals(sfCreate(100,50,25,12,12) /* empty */);
+    // greater than compact size = skip those
+    compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700);
+    // big size + threshold
+    compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
+    // small files = don't care about ratio
+    compactEquals(sfCreate(8,3,1), 8,3,1);
+    // sort first so you don't include huge file the tail end.
+    // happens with HFileOutputFormat bulk migration
+    compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
+    // don't exceed max file compact threshold
+    assertEquals(maxFiles,
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
+
+    /* MAJOR COMPACTION */
+    // if a major compaction has been forced, then compact everything
+    compactEquals(sfCreate(100,50,25,12,12), true, 100, 50, 25, 12, 12);
+    // also choose files < threshold on major compaction
+    compactEquals(sfCreate(12,12), true, 12, 12);
+    // unless one of those files is too big
+    compactEquals(sfCreate(tooBig, 12,12), true, 12, 12);
+    // don't exceed max file compact threshold, even with major compaction
+    assertEquals(maxFiles,
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
+
+    /* REFERENCES == file is from a region that was split */
+    // treat storefiles that have references like a major compaction
+    compactEquals(sfCreate(true, 100,50,25,12,12), true, 100, 50, 25, 12, 12);
+    // reference files shouldn't obey max threshold
+    compactEquals(sfCreate(true, tooBig, 12,12), true, tooBig, 12, 12);
+    // reference files should obey max file compact to avoid OOM
+    assertEquals(maxFiles,
+        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), true).size());
+
+    // empty case
+    compactEquals(new ArrayList<StoreFile>() /* empty */);
+    // empty case (because all files are too big)
+    compactEquals(sfCreate(tooBig, tooBig) /* empty */);
+  }
+}