You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/30 23:04:42 UTC

svn commit: r1403890 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/ap...

Author: tedyu
Date: Tue Oct 30 22:04:41 2012
New Revision: 1403890

URL: http://svn.apache.org/viewvc?rev=1403890&view=rev
Log:
HBASE-7055 port HBASE-6371 tier-based compaction from 0.89-fb to trunk - revert for further discussion


Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
    hbase/trunk/hbase-server/src/main/resources/hbase-compactions.xml
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java Tue Oct 30 22:04:41 2012
@@ -98,7 +98,6 @@ public class HBaseConfiguration extends 
   public static Configuration addHbaseResources(Configuration conf) {
     conf.addResource("hbase-default.xml");
     conf.addResource("hbase-site.xml");
-    conf.addResource("hbase-compactions.xml");
 
     checkDefaultsVersion(conf);
     checkForClusterFreeMemoryLimit(conf);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 30 22:04:41 2012
@@ -223,8 +223,6 @@ public class HFileOutputFormat extends F
               Bytes.toBytes(compactionExclude));
           w.appendFileInfo(StoreFile.TIMERANGE_KEY,
               WritableUtils.toByteArray(trt));
-          w.appendFileInfo(StoreFile.MIN_FLUSH_TIME, 
-              Bytes.toBytes(StoreFile.NO_MIN_FLUSH_TIME));
           w.close();
         }
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Tue Oct 30 22:04:41 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -65,15 +64,11 @@ class Compactor extends Configured {
       final Collection<StoreFile> filesToCompact,
       final boolean majorCompaction, final long maxId)
   throws IOException {
-    // Calculate maximum key count after compaction (for blooms), and minFlushTime after compaction
+    // Calculate maximum key count after compaction (for blooms)
     // Also calculate earliest put timestamp if major compaction
     int maxKeyCount = 0;
-    long minFlushTime = Long.MAX_VALUE;
     long earliestPutTs = HConstants.LATEST_TIMESTAMP;
     for (StoreFile file: filesToCompact) {
-      if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) {
-        minFlushTime = file.getMinFlushTime();
-      }
       StoreFile.Reader r = file.getReader();
       if (r == null) {
         LOG.warn("Null reader for " + file.getPath());
@@ -199,10 +194,6 @@ class Compactor extends Configured {
       }
     } finally {
       if (writer != null) {
-        if (minFlushTime == Long.MAX_VALUE) {
-          minFlushTime = StoreFile.NO_MIN_FLUSH_TIME;
-        }
-        writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime));
         writer.appendMetadata(maxId, majorCompaction);
         writer.close();
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 30 22:04:41 2012
@@ -4176,7 +4176,7 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * @return True if needs a major compaction.
+   * @return True if needs a mojor compaction.
    * @throws IOException
    */
   boolean isMajorCompaction() throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Oct 30 22:04:41 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,10 +63,9 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
-import org.apache.hadoop.hbase.regionserver.CompactionManager;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -79,6 +77,8 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -108,24 +108,21 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 public class HStore extends SchemaConfigured implements Store {
   static final Log LOG = LogFactory.getLog(HStore.class);
-  
-  /** Parameter name for what compaction manager to use. */
-  private static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class";
-
-  /** Default compaction manager class name. */
-  private static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
 
   protected final MemStore memstore;
   // This stores directory in the filesystem.
   private final Path homedir;
   private final HRegion region;
   private final HColumnDescriptor family;
-  CompactionManager compactionManager;
   final FileSystem fs;
   final Configuration conf;
   final CacheConfig cacheConf;
-  // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
+  // ttl in milliseconds.
   private long ttl;
+  private final int minFilesToCompact;
+  private final int maxFilesToCompact;
+  private final long minCompactSize;
+  private final long maxCompactSize;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -200,7 +197,7 @@ public class HStore extends SchemaConfig
 
     this.comparator = info.getComparator();
     // Get TTL
-    this.ttl = determineTTLFromFamily(family);
+    this.ttl = getTTL(family);
     // used by ScanQueryMatcher
     long timeToPurgeDeletes =
         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@@ -211,11 +208,23 @@ public class HStore extends SchemaConfig
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
 
+    // By default, compact if storefile.count >= minFilesToCompact
+    this.minFilesToCompact = Math.max(2,
+      conf.getInt("hbase.hstore.compaction.min",
+        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+    LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
+
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
     this.blockingStoreFileCount =
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
 
+    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
+      this.region.memstoreFlushSize);
+    this.maxCompactSize
+      = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
     if (HStore.closeCheckInterval == 0) {
@@ -230,53 +239,13 @@ public class HStore extends SchemaConfig
     this.bytesPerChecksum = getBytesPerChecksum(conf);
     // Create a compaction tool instance
     this.compactor = new Compactor(this.conf);
-
-    setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
-  }
-
-  /**
-   * This setter is used for unit testing
-   * TODO: Fix this for online configuration updating
-   */
-  void setCompactionPolicy(String managerClassName) {
-    try {
-      Class<? extends CompactionManager> managerClass =
-        (Class<? extends CompactionManager>) Class.forName(managerClassName);
-      compactionManager = managerClass.getDeclaredConstructor(
-          new Class[] {Configuration.class, Store.class } ).newInstance(
-          new Object[] { conf, this } );
-    } catch (ClassNotFoundException e) {
-      throw new UnsupportedOperationException(
-          "Unable to find region server interface " + managerClassName, e);
-    } catch (IllegalAccessException e) {
-      throw new UnsupportedOperationException(
-          "Unable to access specified class " + managerClassName, e);
-    } catch (InstantiationException e) {
-      throw new UnsupportedOperationException(
-          "Unable to instantiate specified class " + managerClassName, e);
-    } catch (InvocationTargetException e) {
-      throw new UnsupportedOperationException(
-          "Unable to invoke specified target class constructor " + managerClassName, e);
-    } catch (NoSuchMethodException e) {
-      throw new UnsupportedOperationException(
-          "Unable to find suitable constructor for class " + managerClassName, e);
-    }
   }
 
-  @Override
-  public Integer getDeterministicRandomSeed() {
-    ImmutableList<StoreFile> snapshot = storefiles;
-    if (snapshot != null && !snapshot.isEmpty()) {
-      return snapshot.get(0).getPath().getName().hashCode();
-    }
-    return null;
-   }
-
   /**
    * @param family
    * @return
    */
-  private static long determineTTLFromFamily(final HColumnDescriptor family) {
+  long getTTL(final HColumnDescriptor family) {
     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     long ttl = family.getTimeToLive();
     if (ttl == HConstants.FOREVER) {
@@ -311,11 +280,6 @@ public class HStore extends SchemaConfig
     return this.fs;
   }
 
-  public long getTtl() {
-    // TTL only applies if there's no MIN_VERSIONs setting on the column.
-    return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
-  }
-
   /**
    * Returns the configured bytesPerChecksum value.
    * @param conf The configuration
@@ -807,11 +771,8 @@ public class HStore extends SchemaConfig
           } while (hasMore);
         } finally {
           // Write out the log sequence number that corresponds to this output
-          // hfile. Also write current time in metadata as minFlushTime.
-          // The hfile is current up to and including logCacheFlushId.
+          // hfile.  The hfile is current up to and including logCacheFlushId.
           status.setStatus("Flushing " + this + ": appending metadata");
-          writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, 
-              Bytes.toBytes(EnvironmentEdgeManager.currentTimeMillis()));
           writer.appendMetadata(logCacheFlushId, false);
           status.setStatus("Flushing " + this + ": closing flushed file");
           writer.close();
@@ -1053,12 +1014,12 @@ public class HStore extends SchemaConfig
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
+        + this + " of "
+        + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
-    long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
       StoreFile.Writer writer =
         this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@@ -1087,11 +1048,8 @@ public class HStore extends SchemaConfig
         (sf == null ? "none" : sf.getPath().getName()) +
         ", size=" + (sf == null ? "none" :
           StringUtils.humanReadableInt(sf.getReader().length()))
-        + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
-        + ". This selection was in queue for "
-        + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
-        + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), compactionStartTime)
-        + " to execute.");
+        + "; total size for store is "
+        + StringUtils.humanReadableInt(storeSize));
     return sf;
   }
 
@@ -1149,8 +1107,11 @@ public class HStore extends SchemaConfig
     return hasReferences(this.storefiles);
   }
 
-  @Override
-  public boolean hasReferences(Collection<StoreFile> files) {
+  /*
+   * @param files
+   * @return True if any of the files in <code>files</code> are References.
+   */
+  private boolean hasReferences(Collection<StoreFile> files) {
     if (files != null && files.size() > 0) {
       for (StoreFile hsf: files) {
         if (hsf.isReference()) {
@@ -1161,6 +1122,22 @@ public class HStore extends SchemaConfig
     return false;
   }
 
+  /*
+   * Gets lowest timestamp from candidate StoreFiles
+   *
+   * @param fs
+   * @param dir
+   * @throws IOException
+   */
+  public static long getLowestTimestamp(final List<StoreFile> candidates)
+      throws IOException {
+    long minTs = Long.MAX_VALUE;
+    for (StoreFile storeFile : candidates) {
+      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
+    }
+    return minTs;
+  }
+
   @Override
   public CompactionProgress getCompactionProgress() {
     return this.compactor.getProgress();
@@ -1176,7 +1153,91 @@ public class HStore extends SchemaConfig
     }
 
     List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
-    return compactionManager.isMajorCompaction(candidates);
+
+    // exclude files above the max compaction threshold
+    // except: save all references. we MUST compact them
+    int pos = 0;
+    while (pos < candidates.size() &&
+           candidates.get(pos).getReader().length() > this.maxCompactSize &&
+           !candidates.get(pos).isReference()) ++pos;
+    candidates.subList(0, pos).clear();
+
+    return isMajorCompaction(candidates);
+  }
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
+    boolean result = false;
+    long mcTime = getNextMajorCompactTime();
+    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+      return result;
+    }
+    // TODO: Use better method for determining stamp of last major (HBASE-2990)
+    long lowTimestamp = getLowestTimestamp(filesToCompact);
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+      // Major compaction time has elapsed.
+      if (filesToCompact.size() == 1) {
+        // Single file
+        StoreFile sf = filesToCompact.get(0);
+        long oldest =
+            (sf.getReader().timeRangeTracker == null) ?
+                Long.MIN_VALUE :
+                now - sf.getReader().timeRangeTracker.minimumTimestamp;
+        if (sf.isMajorCompaction() &&
+            (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping major compaction of " + this +
+                " because one (major) compacted file only and oldestTime " +
+                oldest + "ms is < ttl=" + this.ttl);
+          }
+        } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
+          LOG.debug("Major compaction triggered on store " + this +
+            ", because keyvalues outdated; time since last major compaction " +
+            (now - lowTimestamp) + "ms");
+          result = true;
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store " + this +
+              "; time since last major compaction " + (now - lowTimestamp) + "ms");
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  long getNextMajorCompactTime() {
+    // default = 24hrs
+    long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
+      String strCompactionTime =
+        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+      ret = (new Long(strCompactionTime)).longValue();
+    }
+
+    if (ret > 0) {
+      // default = 20% = +/- 4.8 hrs
+      double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
+          0.20F);
+      if (jitterPct > 0) {
+        long jitter = Math.round(ret * jitterPct);
+        // deterministic jitter avoids a major compaction storm on restart
+        ImmutableList<StoreFile> snapshot = storefiles;
+        if (snapshot != null && !snapshot.isEmpty()) {
+          String seed = snapshot.get(0).getPath().getName();
+          double curRand = new Random(seed.hashCode()).nextDouble();
+          ret += jitter - Math.round(2L * jitter * curRand);
+        } else {
+          ret = 0; // no storefiles == no major compaction
+        }
+      }
+    }
+    return ret;
   }
 
   public CompactionRequest requestCompaction() throws IOException {
@@ -1212,10 +1273,9 @@ public class HStore extends SchemaConfig
         CompactSelection filesToCompact;
         if (override) {
           // coprocessor is overriding normal file selection
-          filesToCompact = new CompactSelection(candidates);
+          filesToCompact = new CompactSelection(conf, candidates);
         } else {
-          filesToCompact = compactionManager.selectCompaction(candidates, priority,
-              forceMajor && filesCompacting.isEmpty());
+          filesToCompact = compactSelection(candidates, priority);
         }
 
         if (region.getCoprocessorHost() != null) {
@@ -1266,6 +1326,191 @@ public class HStore extends SchemaConfig
   }
 
   /**
+   * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
+   * @param candidates
+   * @return
+   * @throws IOException
+   */
+  CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
+    return compactSelection(candidates,Store.NO_PRIORITY);
+  }
+
+  /**
+   * Algorithm to choose which files to compact
+   *
+   * Configuration knobs:
+   *  "hbase.hstore.compaction.ratio"
+   *    normal case: minor compact when file <= sum(smaller_files) * ratio
+   *  "hbase.hstore.compaction.min.size"
+   *    unconditionally compact individual files below this size
+   *  "hbase.hstore.compaction.max.size"
+   *    never compact individual files above this size (unless splitting)
+   *  "hbase.hstore.compaction.min"
+   *    min files needed to minor compact
+   *  "hbase.hstore.compaction.max"
+   *    max files to compact at once (avoids OOM)
+   *
+   * @param candidates candidate files, ordered from oldest to newest
+   * @return subset copy of candidate list that meets compaction criteria
+   * @throws IOException
+   */
+  CompactSelection compactSelection(List<StoreFile> candidates, int priority)
+      throws IOException {
+    // ASSUMPTION!!! filesCompacting is locked when calling this function
+
+    /* normal skew:
+     *
+     *         older ----> newer
+     *     _
+     *    | |   _
+     *    | |  | |   _
+     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+     *    | |  | |  | |  | |  _  | |
+     *    | |  | |  | |  | | | | | |
+     *    | |  | |  | |  | | | | | |
+     */
+    CompactSelection compactSelection = new CompactSelection(conf, candidates);
+
+    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
+    if (!forcemajor) {
+      // Delete the expired store files before the compaction selection.
+      if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
+          && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
+        CompactSelection expiredSelection = compactSelection
+            .selectExpiredStoreFilesToCompact(
+                EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
+
+        // If there is any expired store files, delete them  by compaction.
+        if (expiredSelection != null) {
+          return expiredSelection;
+        }
+      }
+      // do not compact old files above a configurable threshold
+      // save all references. we MUST compact them
+      int pos = 0;
+      while (pos < compactSelection.getFilesToCompact().size() &&
+             compactSelection.getFilesToCompact().get(pos).getReader().length()
+               > maxCompactSize &&
+             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
+      if (pos != 0) compactSelection.clearSubList(0, pos);
+    }
+
+    if (compactSelection.getFilesToCompact().isEmpty()) {
+      LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+        this + ": no store files to compact");
+      compactSelection.emptyFileList();
+      return compactSelection;
+    }
+
+    // Force a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested
+    // as a major compaction
+    boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
+      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
+      (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
+    );
+    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+      this.getColumnFamilyName() + ": Initiating " +
+      (majorcompaction ? "major" : "minor") + "compaction");
+
+    if (!majorcompaction &&
+        !hasReferences(compactSelection.getFilesToCompact())) {
+      // we're doing a minor compaction, let's see what files are applicable
+      int start = 0;
+      double r = compactSelection.getCompactSelectionRatio();
+
+      // remove bulk import files that request to be excluded from minors
+      compactSelection.getFilesToCompact().removeAll(Collections2.filter(
+          compactSelection.getFilesToCompact(),
+          new Predicate<StoreFile>() {
+            public boolean apply(StoreFile input) {
+              return input.excludeFromMinorCompaction();
+            }
+          }));
+
+      // skip selection algorithm if we don't have enough files
+      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Not compacting files because we only have " +
+            compactSelection.getFilesToCompact().size() +
+            " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
+        }
+        compactSelection.emptyFileList();
+        return compactSelection;
+      }
+
+      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
+      // Sort files by size to correct when normal skew is altered by bulk load.
+      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
+       */
+
+      // get store file sizes for incremental compacting selection.
+      int countOfFiles = compactSelection.getFilesToCompact().size();
+      long [] fileSizes = new long[countOfFiles];
+      long [] sumSize = new long[countOfFiles];
+      for (int i = countOfFiles-1; i >= 0; --i) {
+        StoreFile file = compactSelection.getFilesToCompact().get(i);
+        fileSizes[i] = file.getReader().length();
+        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+        int tooFar = i + this.maxFilesToCompact - 1;
+        sumSize[i] = fileSizes[i]
+                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
+                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+      }
+
+      /* Start at the oldest file and stop when you find the first file that
+       * meets compaction criteria:
+       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
+       *      OR
+       *   (2) within the compactRatio of sum(newer_files)
+       * Given normal skew, any newer files will also meet this criteria
+       *
+       * Additional Note:
+       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+       * compact().  Consider the oldest files first to avoid a
+       * situation where we always compact [end-threshold,end).  Then, the
+       * last file becomes an aggregate of the previous compactions.
+       */
+      while(countOfFiles - start >= this.minFilesToCompact &&
+            fileSizes[start] >
+              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
+        ++start;
+      }
+      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
+      long totalSize = fileSizes[start]
+                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
+      compactSelection = compactSelection.getSubList(start, end);
+
+      // if we don't have enough files to compact, just wait
+      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipped compaction of " + this
+            + ".  Only " + (end - start) + " file(s) of size "
+            + StringUtils.humanReadableInt(totalSize)
+            + " have met compaction criteria.");
+        }
+        compactSelection.emptyFileList();
+        return compactSelection;
+      }
+    } else {
+      if(majorcompaction) {
+        if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+          LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
+            " files, probably because of a user-requested major compaction");
+          if(priority != Store.PRIORITY_USER) {
+            LOG.error("Compacting more than max files on a non user-requested compaction");
+          }
+        }
+      } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+        // all files included in this compaction, up to max
+        int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
+        compactSelection.getFilesToCompact().subList(0, pastMax).clear();
+      }
+    }
+    return compactSelection;
+  }
+
+  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -1772,7 +2017,11 @@ public class HStore extends SchemaConfig
 
   @Override
   public boolean throttleCompaction(long compactionSize) {
-    return compactionManager.throttleCompaction(compactionSize);
+    // see HBASE-5867 for discussion on the default
+    long throttlePoint = conf.getLong(
+        "hbase.regionserver.thread.compaction.throttle",
+        2 * this.minFilesToCompact * this.region.memstoreFlushSize);
+    return compactionSize > throttlePoint;
   }
 
   @Override
@@ -1867,7 +2116,7 @@ public class HStore extends SchemaConfig
 
   @Override
   public boolean needsCompaction() {
-    return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size());
+    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
   }
 
   @Override
@@ -1877,8 +2126,8 @@ public class HStore extends SchemaConfig
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (18 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
-          + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -1900,15 +2149,6 @@ public class HStore extends SchemaConfig
   }
 
   /**
-   * Refreshes compaction manager class configuration. 
-   * Used for tests only - not plumbed thru any layers.
-   * TODO: replace when HBASE-3909 is in.
-   */
-  void updateConfiguration() {
-    setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
-  }
-
-  /**
    * Immutable information for scans over a store.
    */
   public static class ScanInfo {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 30 22:04:41 2012
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -206,12 +204,6 @@ public interface Store extends SchemaAwa
    * @return <tt>true</tt> if the store has any underlying reference files to older HFiles
    */
   public boolean hasReferences();
-  
-  /*
-   * @param files
-   * @return True if any of the files in <code>files</code> are References.
-   */
-  public boolean hasReferences(Collection<StoreFile> files);
 
   /**
    * @return The size of this store's memstore, in bytes
@@ -275,11 +267,6 @@ public interface Store extends SchemaAwa
    * @return the total size of all Bloom filters in the store
    */
   public long getTotalStaticBloomSize();
-  
-  /**
-   * Returns the TTL for this store's column family.
-   */
-  public long getTtl(); 
 
   // Test-helper methods
 
@@ -300,10 +287,4 @@ public interface Store extends SchemaAwa
    * @return the parent region hosting this store
    */
   public HRegion getHRegion();
-  
-  /**
-   * @return A hash code depending on the state of the current store files.
-   * This is used as seed for deterministic random generator for selecting major compaction time
-   */
-  public Integer getDeterministicRandomSeed();
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 30 22:04:41 2012
@@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.util.Bloo
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
@@ -115,9 +114,6 @@ public class StoreFile extends SchemaCon
   /** Max Sequence ID in FileInfo */
   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
 
-  /** Min Flush time in FileInfo */
-  public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME");
-
   /** Major compaction flag in FileInfo */
   public static final byte[] MAJOR_COMPACTION_KEY =
       Bytes.toBytes("MAJOR_COMPACTION_KEY");
@@ -147,9 +143,6 @@ public class StoreFile extends SchemaCon
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
 
-  /** Default value for files without minFlushTime in metadata */
-  public static final long NO_MIN_FLUSH_TIME = -1;
-
   private final FileSystem fs;
 
   // This file's path.
@@ -176,8 +169,6 @@ public class StoreFile extends SchemaCon
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
-  // default value is -1, remains -1 if file written without minFlushTime
-  private long minFlushTime = NO_MIN_FLUSH_TIME;
 
   // max of the MemstoreTS in the KV's in this store
   // Set when we obtain a Reader.
@@ -390,22 +381,6 @@ public class StoreFile extends SchemaCon
     return this.sequenceid;
   }
 
-  public boolean hasMinFlushTime() {
-    return this.minFlushTime != NO_MIN_FLUSH_TIME;
-  }
-
-  public long getMinFlushTime() {
-      // BulkLoad files are assumed to contain very old data, return 0
-      if (isBulkLoadResult() && getMaxSequenceId() <= 0) {
-        return 0;
-      } else if (this.minFlushTime == NO_MIN_FLUSH_TIME) {
-          // File written without minFlushTime field assume recent data
-          return EnvironmentEdgeManager.currentTimeMillis();
-      } else {
-        return this.minFlushTime;
-      }
-  }
-
   public long getModificationTimeStamp() {
     return modificationTimeStamp;
   }
@@ -612,10 +587,7 @@ public class StoreFile extends SchemaCon
         }
       }
     }
-    b = metadataMap.get(MIN_FLUSH_TIME);
-    if (b != null) {
-        this.minFlushTime = Bytes.toLong(b);
-    }
+
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Tue Oct 30 22:04:41 2012
@@ -19,13 +19,15 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 @InterfaceAudience.Private
 public class CompactSelection {
@@ -46,15 +48,37 @@ public class CompactSelection {
    */
   private final static Object compactionCountLock = new Object();
 
+  // HBase conf object
+  Configuration conf;
   // was this compaction promoted to an off-peak
   boolean isOffPeakCompaction = false;
-  // CompactSelection object creation time.
-  private final long selectionTime;
+  // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
+  // With float, java will downcast your long to float for comparisons (bad)
+  private double compactRatio;
+  // compaction ratio off-peak
+  private double compactRatioOffPeak;
+  // offpeak start time
+  private int offPeakStartHour = -1;
+  // off peak end time
+  private int offPeakEndHour = -1;
 
-  public CompactSelection(List<StoreFile> filesToCompact) {
-    this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
+  public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
     this.filesToCompact = filesToCompact;
-    this.isOffPeakCompaction = false;
+    this.conf = conf;
+    this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
+    this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
+
+    // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
+    this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+    this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+    if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
+      if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
+        LOG.warn("Invalid start/end hour for peak hour : start = " +
+            this.offPeakStartHour + " end = " + this.offPeakEndHour +
+            ". Valid numbers are [0-23]");
+      }
+      this.offPeakStartHour = this.offPeakEndHour = -1;
+    }
   }
 
   /**
@@ -89,25 +113,49 @@ public class CompactSelection {
     }
 
     if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
+      expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
     }
     return expiredSFSelection;
   }
 
   /**
+   * If the current hour falls in the off peak times and there are no 
+   * outstanding off peak compactions, the current compaction is 
+   * promoted to an off peak compaction. Currently only one off peak 
+   * compaction is present in the compaction queue.
+   *
+   * @param currentHour
+   * @return
+   */
+  public double getCompactSelectionRatio() {
+    double r = this.compactRatio;
+    synchronized(compactionCountLock) {
+      if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
+        r = this.compactRatioOffPeak;
+        numOutstandingOffPeakCompactions++;
+        isOffPeakCompaction = true;
+      }
+    }
+    if(isOffPeakCompaction) {
+      LOG.info("Running an off-peak compaction, selection ratio = " +
+          compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
+          numOutstandingOffPeakCompactions);
+    }
+    return r;
+  }
+
+  /**
    * The current compaction finished, so reset the off peak compactions count
    * if this was an off peak compaction.
    */
   public void finishRequest() {
     if (isOffPeakCompaction) {
-      long newValueToLog = -1;
       synchronized(compactionCountLock) {
-        assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
-        newValueToLog = --numOutstandingOffPeakCompactions;
+        numOutstandingOffPeakCompactions--;
         isOffPeakCompaction = false;
       }
       LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
-          newValueToLog);
+          numOutstandingOffPeakCompactions);
     }
   }
 
@@ -122,14 +170,13 @@ public class CompactSelection {
   public void emptyFileList() {
     filesToCompact.clear();
     if (isOffPeakCompaction) {
-      long newValueToLog = -1;
       synchronized(compactionCountLock) {
         // reset the off peak count
-        newValueToLog = --numOutstandingOffPeakCompactions;
+        numOutstandingOffPeakCompactions--;
         isOffPeakCompaction = false;
       }
       LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
-          newValueToLog);
+          numOutstandingOffPeakCompactions);
     }
   }
 
@@ -137,30 +184,16 @@ public class CompactSelection {
     return this.isOffPeakCompaction;
   }
 
-  public static long getNumOutStandingOffPeakCompactions() {
-    synchronized(compactionCountLock) {
-      return numOutstandingOffPeakCompactions;
+  private boolean isOffPeakHour() {
+    int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
+    // If offpeak time checking is disabled just return false.
+    if (this.offPeakStartHour == this.offPeakEndHour) {
+      return false;
     }
-  }
-
-  /**
-   * Tries making the compaction off-peak.
-   * Only checks internal compaction constraints, not timing.
-   * @return Eventual value of isOffPeakCompaction.
-   */
-  public boolean trySetOffpeak() {
-    assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
-    synchronized(compactionCountLock) {
-      if (numOutstandingOffPeakCompactions == 0) {
-         numOutstandingOffPeakCompactions++;
-         isOffPeakCompaction = true;
-      }
+    if (this.offPeakStartHour < this.offPeakEndHour) {
+      return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
     }
-    return isOffPeakCompaction;
-  }
-
-  public long getSelectionTime() {
-    return selectionTime;
+    return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
   }
 
   public CompactSelection subList(int start, int end) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1403890&r1=1403889&r2=1403890&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Oct 30 22:04:41 2012
@@ -208,10 +208,6 @@ public class CompactionRequest implement
       return p;
     }
 
-    public long getSelectionTime() {
-      return compactSelection.getSelectionTime();
-    }
-
     /** Gets the priority for the request */
     public void setPriority(int p) {
       this.p = p;
@@ -276,7 +272,7 @@ public class CompactionRequest implement
         server.checkFileSystem();
       } finally {
         s.finishRequest(this);
-        LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
+        LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
       }
     }
 

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java?rev=1403890&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java Tue Oct 30 22:04:41 2012
@@ -0,0 +1,288 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCompactSelection extends TestCase {
+  private final static Log LOG = LogFactory.getLog(TestCompactSelection.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private Configuration conf;
+  private HStore store;
+  private static final String DIR=
+    TEST_UTIL.getDataTestDir("TestCompactSelection").toString();
+  private static Path TEST_FILE;
+
+  private static final int minFiles = 3;
+  private static final int maxFiles = 5;
+
+  private static final long minSize = 10;
+  private static final long maxSize = 1000;
+
+
+  @Override
+  public void setUp() throws Exception {
+    // setup config values necessary for store
+    this.conf = TEST_UTIL.getConfiguration();
+    this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
+    this.conf.setInt("hbase.hstore.compaction.min", minFiles);
+    this.conf.setInt("hbase.hstore.compaction.max", maxFiles);
+    this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize);
+    this.conf.setLong("hbase.hstore.compaction.max.size", maxSize);
+    this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
+
+    //Setting up a Store
+    Path basedir = new Path(DIR);
+    String logName = "logs";
+    Path logdir = new Path(DIR, logName);
+    Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
+    HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(logdir, true);
+
+    HTableDescriptor htd = new HTableDescriptor(Bytes.toBytes("table"));
+    htd.addFamily(hcd);
+    HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+
+    HLog hlog = HLogFactory.createHLog(fs, basedir, 
+        logName, conf);
+    HRegion region = HRegion.createHRegion(info, basedir, conf, htd);
+    HRegion.closeHRegion(region);
+    Path tableDir = new Path(basedir, Bytes.toString(htd.getName()));
+    region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
+
+    store = new HStore(basedir, region, hcd, fs, conf);
+    TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
+    fs.create(TEST_FILE);
+  }
+
+  // used so our tests don't deal with actual StoreFiles
+  static class MockStoreFile extends StoreFile {
+    long length = 0;
+    boolean isRef = false;
+
+    MockStoreFile(long length, boolean isRef) throws IOException {
+      super(TEST_UTIL.getTestFileSystem(), TEST_FILE,
+            TEST_UTIL.getConfiguration(),
+            new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
+            NoOpDataBlockEncoder.INSTANCE);
+      this.length = length;
+      this.isRef  = isRef;
+    }
+
+    void setLength(long newLen) {
+      this.length = newLen;
+    }
+
+    @Override
+    boolean isMajorCompaction() {
+      return false;
+    }
+
+    @Override
+    boolean isReference() {
+      return this.isRef;
+    }
+
+    @Override
+    public StoreFile.Reader getReader() {
+      final long len = this.length;
+      return new StoreFile.Reader() {
+        @Override
+        public long length() {
+          return len;
+        }
+      };
+    }
+  }
+
+  List<StoreFile> sfCreate(long ... sizes) throws IOException {
+    return sfCreate(false, sizes);
+  }
+
+  List<StoreFile> sfCreate(boolean isReference, long ... sizes)
+  throws IOException {
+    List<StoreFile> ret = Lists.newArrayList();
+    for (long i : sizes) {
+      ret.add(new MockStoreFile(i, isReference));
+    }
+    return ret;
+  }
+
+  long[] getSizes(List<StoreFile> sfList) {
+    long[] aNums = new long[sfList.size()];
+    for (int i=0; i <sfList.size(); ++i) {
+      aNums[i] = sfList.get(i).getReader().length();
+    }
+    return aNums;
+  }
+  
+  void compactEquals(List<StoreFile> candidates, long ... expected) 
+  throws IOException {
+    compactEquals(candidates, false, expected);
+  }
+
+  void compactEquals(List<StoreFile> candidates, boolean forcemajor, 
+      long ... expected)
+  throws IOException {
+    store.forceMajor = forcemajor;
+    List<StoreFile> actual = store.compactSelection(candidates).getFilesToCompact();
+    store.forceMajor = false;
+    assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
+  }
+
+  public void testCompactionRatio() throws IOException {
+    /*
+     * NOTE: these tests are specific to describe the implementation of the
+     * current compaction algorithm.  Developed to ensure that refactoring
+     * doesn't implicitly alter this.
+     */
+    long tooBig = maxSize + 1;
+
+    // default case. preserve user ratio on size
+    compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
+    // less than compact threshold = don't compact
+    compactEquals(sfCreate(100,50,25,12,12) /* empty */);
+    // greater than compact size = skip those
+    compactEquals(sfCreate(tooBig, tooBig, 700, 700, 700), 700, 700, 700);
+    // big size + threshold
+    compactEquals(sfCreate(tooBig, tooBig, 700,700) /* empty */);
+    // small files = don't care about ratio
+    compactEquals(sfCreate(8,3,1), 8,3,1);
+    /* TODO: add sorting + unit test back in when HBASE-2856 is fixed 
+    // sort first so you don't include huge file the tail end
+    // happens with HFileOutputFormat bulk migration
+    compactEquals(sfCreate(100,50,23,12,12, 500), 23, 12, 12);
+     */
+    // don't exceed max file compact threshold
+    assertEquals(maxFiles,
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
+    // note:  file selection starts with largest to smallest.
+    compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
+    
+    /* MAJOR COMPACTION */
+    // if a major compaction has been forced, then compact everything
+    compactEquals(sfCreate(50,25,12,12), true, 50, 25, 12, 12);
+    // also choose files < threshold on major compaction
+    compactEquals(sfCreate(12,12), true, 12, 12);
+    // even if one of those files is too big
+    compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
+    // don't exceed max file compact threshold, even with major compaction
+    store.forceMajor = true;
+    compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3);
+    store.forceMajor = false;
+
+    // if we exceed maxCompactSize, downgrade to minor
+    // if not, it creates a 'snowball effect' when files >> maxCompactSize:
+    // the last file in compaction is the aggregate of all previous compactions
+    compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
+    conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
+    conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
+    try {
+      // trigger an aged major compaction
+      compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
+      // major sure exceeding maxCompactSize also downgrades aged minors
+      compactEquals(sfCreate(100,50,23,12,12), 23, 12, 12);
+    } finally {
+      conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+      conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+    }
+
+    /* REFERENCES == file is from a region that was split */
+    // treat storefiles that have references like a major compaction
+    compactEquals(sfCreate(true, 100,50,25,12,12), 100, 50, 25, 12, 12);
+    // reference files shouldn't obey max threshold
+    compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
+    // reference files should obey max file compact to avoid OOM
+    assertEquals(maxFiles,
+        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size());
+    // reference compaction
+    compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1);
+    
+    // empty case
+    compactEquals(new ArrayList<StoreFile>() /* empty */);
+    // empty case (because all files are too big)
+    compactEquals(sfCreate(tooBig, tooBig) /* empty */);
+  }
+
+  public void testOffPeakCompactionRatio() throws IOException {
+    /*
+     * NOTE: these tests are specific to describe the implementation of the
+     * current compaction algorithm.  Developed to ensure that refactoring
+     * doesn't implicitly alter this.
+     */
+    long tooBig = maxSize + 1;
+
+    Calendar calendar = new GregorianCalendar();
+    int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
+    LOG.debug("Hour of day = " + hourOfDay);
+    int hourPlusOne = ((hourOfDay+1+24)%24);
+    int hourMinusOne = ((hourOfDay-1+24)%24);
+    int hourMinusTwo = ((hourOfDay-2+24)%24);
+
+    // check compact selection without peak hour setting
+    LOG.debug("Testing compact selection without off-peak settings...");
+    compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
+
+    // set an off-peak compaction threshold
+    this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
+
+    // set peak hour to current time and check compact selection
+    this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
+    this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
+    LOG.debug("Testing compact selection with off-peak settings (" +
+        hourMinusOne + ", " + hourPlusOne + ")");
+    compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1);
+
+    // set peak hour outside current selection and check compact selection
+    this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
+    this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
+    LOG.debug("Testing compact selection with off-peak settings (" +
+        hourMinusTwo + ", " + hourMinusOne + ")");
+    compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
+  }
+
+}
+