You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/30 21:14:02 UTC

svn commit: r1403852 [1/2] - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/...

Author: tedyu
Date: Tue Oct 30 20:14:01 2012
New Revision: 1403852

URL: http://svn.apache.org/viewvc?rev=1403852&view=rev
Log:
HBASE-7055 port HBASE-6371 tier-based compaction from 0.89-fb to trunk (Sergey)


Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
    hbase/trunk/hbase-server/src/main/resources/hbase-compactions.xml   (with props)
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java
Removed:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java Tue Oct 30 20:14:01 2012
@@ -98,6 +98,7 @@ public class HBaseConfiguration extends 
   public static Configuration addHbaseResources(Configuration conf) {
     conf.addResource("hbase-default.xml");
     conf.addResource("hbase-site.xml");
+    conf.addResource("hbase-compactions.xml");
 
     checkDefaultsVersion(conf);
     checkForClusterFreeMemoryLimit(conf);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 30 20:14:01 2012
@@ -223,6 +223,8 @@ public class HFileOutputFormat extends F
               Bytes.toBytes(compactionExclude));
           w.appendFileInfo(StoreFile.TIMERANGE_KEY,
               WritableUtils.toByteArray(trt));
+          w.appendFileInfo(StoreFile.MIN_FLUSH_TIME, 
+              Bytes.toBytes(StoreFile.NO_MIN_FLUSH_TIME));
           w.close();
         }
       }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java?rev=1403852&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java Tue Oct 30 20:14:01 2012
@@ -0,0 +1,203 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Control knobs for default compaction algorithm :
+ * <p/>
+ * maxCompactSize - upper bound on file size to be included in minor compactions
+ * minCompactSize - lower bound below which compaction is selected without ratio test
+ * minFilesToCompact - lower bound on number of files in any minor compaction
+ * maxFilesToCompact - upper bound on number of files in any minor compaction
+ * compactionRatio - Ratio used for compaction
+ * <p/>
+ * Set parameter as "hbase.hstore.compaction.<attribute>"
+ */
+
+//TODO: revisit this class for online parameter updating
+
+public class CompactionConfiguration {
+
+  static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
+
+  Configuration conf;
+  Store store;
+
+  long maxCompactSize;
+  long minCompactSize;
+  int minFilesToCompact;
+  int maxFilesToCompact;
+  double compactionRatio;
+  double offPeekCompactionRatio;
+  int offPeakStartHour;
+  int offPeakEndHour;
+  long throttlePoint;
+  boolean shouldDeleteExpired;
+  long majorCompactionPeriod;
+  float majorCompactionJitter;
+
+  CompactionConfiguration(Configuration conf, Store store) {
+    this.conf = conf;
+    this.store = store;
+
+    String strPrefix = "hbase.hstore.compaction.";
+
+    maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
+    minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
+    minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
+          /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+    maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
+    compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
+    offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
+    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+
+    if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
+      if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
+        LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
+            this.offPeakStartHour + " end = " + this.offPeakEndHour +
+            ". Valid numbers are [0-23]");
+      }
+      this.offPeakStartHour = this.offPeakEndHour = -1;
+    }
+
+    throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
+          2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+    shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
+    majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+    majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+
+    LOG.info("Compaction configuration " + this.toString());
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+      "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " +
+      "throttle point %d;%s delete expired; major period %d, major jitter %f",
+      minCompactSize,
+      maxCompactSize,
+      minFilesToCompact,
+      maxFilesToCompact,
+      compactionRatio,
+      offPeekCompactionRatio,
+      offPeakStartHour,
+      offPeakEndHour,
+      throttlePoint,
+      shouldDeleteExpired ? "" : " don't",
+      majorCompactionPeriod,
+      majorCompactionJitter);
+  }
+
+  /**
+   * @return lower bound below which compaction is selected without ratio test
+   */
+  long getMinCompactSize() {
+    return minCompactSize;
+  }
+
+  /**
+   * @return upper bound on file size to be included in minor compactions
+   */
+  long getMaxCompactSize() {
+    return maxCompactSize;
+  }
+
+  /**
+   * @return upper bound on number of files to be included in minor compactions
+   */
+  int getMinFilesToCompact() {
+    return minFilesToCompact;
+  }
+
+  /**
+   * @return upper bound on number of files to be included in minor compactions
+   */
+  int getMaxFilesToCompact() {
+    return maxFilesToCompact;
+  }
+
+  /**
+   * @return Ratio used for compaction
+   */
+  double getCompactionRatio() {
+    return compactionRatio;
+  }
+
+  /**
+   * @return Off peak Ratio used for compaction
+   */
+  double getCompactionRatioOffPeak() {
+    return offPeekCompactionRatio;
+  }
+
+  /**
+   * @return Hour at which off-peak compactions start
+   */
+  int getOffPeakStartHour() {
+    return offPeakStartHour;
+  }
+
+  /**
+   * @return Hour at which off-peak compactions end
+   */
+  int getOffPeakEndHour() {
+    return offPeakEndHour;
+  }
+
+  /**
+   * @return ThrottlePoint used for classifying small and large compactions
+   */
+  long getThrottlePoint() {
+    return throttlePoint;
+  }
+
+  /**
+   * @return Major compaction period from compaction.
+   * Major compactions are selected periodically according to this parameter plus jitter
+   */
+  long getMajorCompactionPeriod() {
+    return majorCompactionPeriod;
+  }
+
+  /**
+   * @return Major the jitter fraction, the fraction within which the major compaction period is
+   *  randomly chosen from the majorCompactionPeriod in each store.
+   */
+  float getMajorCompactionJitter() {
+    return majorCompactionJitter;
+  }
+
+  /**
+   * @return Whether expired files should be deleted ASAP using compactions
+   */
+  boolean shouldDeleteExpired() {
+    return shouldDeleteExpired;
+  }
+  
+  private static boolean isValidHour(int hour) {
+    return (hour >= 0 && hour <= 23);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java?rev=1403852&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java Tue Oct 30 20:14:01 2012
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.Random;
+
+@InterfaceAudience.Private
+public class CompactionManager {
+
+  private static final Log LOG = LogFactory.getLog(CompactionManager.class);
+  private final static Calendar calendar = new GregorianCalendar();
+
+  private Store store;
+  CompactionConfiguration comConf;
+
+  CompactionManager(Configuration configuration, Store store) {
+    this.store = store;
+    comConf = new CompactionConfiguration(configuration, store);
+  }
+
+  /**
+   * @param candidateFiles candidate files, ordered from oldest to newest
+   * @return subset copy of candidate list that meets compaction criteria
+   * @throws java.io.IOException
+   */
+  CompactSelection selectCompaction(List<StoreFile> candidateFiles, int priority, boolean forceMajor)
+    throws IOException {
+    // Prelimanry compaction subject to filters
+    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+
+    if (!forceMajor) {
+      // If there are expired files, only select them so that compaction deletes them
+      if (comConf.shouldDeleteExpired() && (store.getTtl() != Long.MAX_VALUE)) {
+        CompactSelection expiredSelection = selectExpiredSFs(
+          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - store.getTtl());
+        if (expiredSelection != null) {
+          return expiredSelection;
+        }
+      }
+      candidateSelection = skipLargeFiles(candidateSelection);
+    }
+
+    // Force a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested
+    // as a major compaction.
+    // Or, if there are any references among the candidates.
+    boolean isUserCompaction = (priority == Store.PRIORITY_USER);
+    boolean majorCompaction = (
+      (forceMajor && isUserCompaction)
+      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) 
+          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) 
+      || store.hasReferences(candidateSelection.getFilesToCompact())
+      );
+
+    LOG.debug(store.getHRegion().regionInfo.getEncodedName() + " - " +
+        store.getColumnFamilyName() + ": Initiating " +
+        (majorCompaction ? "major" : "minor") + "compaction");
+
+    if (!majorCompaction) {
+      // we're doing a minor compaction, let's see what files are applicable
+      candidateSelection = filterBulk(candidateSelection);
+      candidateSelection = applyCompactionPolicy(candidateSelection);
+      candidateSelection = checkMinFilesCriteria(candidateSelection);
+    }
+    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
+    return candidateSelection;
+  }
+
+  /**
+   * Select the expired store files to compact
+   *
+   * @param candidates the initial set of storeFiles
+   * @param maxExpiredTimeStamp
+   *          The store file will be marked as expired if its max time stamp is
+   *          less than this maxExpiredTimeStamp.
+   * @return A CompactSelection contains the expired store files as
+   *         filesToCompact
+   */
+  private CompactSelection selectExpiredSFs
+      (CompactSelection candidates, long maxExpiredTimeStamp) {
+    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
+    if (filesToCompact == null || filesToCompact.size() == 0)
+      return null;
+    ArrayList<StoreFile> expiredStoreFiles = null;
+    boolean hasExpiredStoreFiles = false;
+    CompactSelection expiredSFSelection = null;
+
+    for (StoreFile storeFile : filesToCompact) {
+      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
+        LOG.info("Deleting the expired store file by compaction: "
+            + storeFile.getPath() + " whose maxTimeStamp is "
+            + storeFile.getReader().getMaxTimestamp()
+            + " while the max expired timestamp is " + maxExpiredTimeStamp);
+        if (!hasExpiredStoreFiles) {
+          expiredStoreFiles = new ArrayList<StoreFile>();
+          hasExpiredStoreFiles = true;
+        }
+        expiredStoreFiles.add(storeFile);
+      }
+    }
+
+    if (hasExpiredStoreFiles) {
+      expiredSFSelection = new CompactSelection(expiredStoreFiles);
+    }
+    return expiredSFSelection;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all files above maxCompactSize
+   * Also save all references. We MUST compact them
+   */
+  private CompactSelection skipLargeFiles(CompactSelection candidates) {
+    int pos = 0;
+    while (pos < candidates.getFilesToCompact().size() &&
+      candidates.getFilesToCompact().get(pos).getReader().length() >
+        comConf.getMaxCompactSize() &&
+      !candidates.getFilesToCompact().get(pos).isReference()) {
+      ++pos;
+    }
+    if (pos > 0) {
+      LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates");
+      candidates.clearSubList(0, pos);
+    }
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all bulk load files if configured
+   */
+  private CompactSelection filterBulk(CompactSelection candidates) {
+    candidates.getFilesToCompact().removeAll(Collections2.filter(
+        candidates.getFilesToCompact(),
+        new Predicate<StoreFile>() {
+          @Override
+          public boolean apply(StoreFile input) {
+            return input.excludeFromMinorCompaction();
+          }
+        }));
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * take upto maxFilesToCompact from the start
+   */
+  private CompactSelection removeExcessFiles(CompactSelection candidates, 
+      boolean isUserCompaction, boolean isMajorCompaction) {
+    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+    if (excess > 0) {
+      if (isMajorCompaction && isUserCompaction) {
+        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
+            " files because of a user-requested major compaction");
+      } else {
+        LOG.debug("Too many admissible files. Excluding " + excess
+          + " files from compaction candidates");
+        candidates.clearSubList(comConf.getMaxFilesToCompact(),
+          candidates.getFilesToCompact().size());
+      }
+    }
+    return candidates;
+  }
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * forget the compactionSelection if we don't have enough files
+   */
+  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+    int minFiles = comConf.getMinFilesToCompact();
+    if (candidates.getFilesToCompact().size() < minFiles) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Not compacting files because we only have " +
+            candidates.getFilesToCompact().size() +
+          " files ready for compaction.  Need " + minFiles + " to initiate.");
+      }
+      candidates.emptyFileList();
+    }
+    return candidates;
+  }
+
+  /**
+    * @param candidates pre-filtrate
+    * @return filtered subset
+    * -- Default minor compaction selection algorithm: Choose CompactSelection from candidates --
+    * First exclude bulk-load files if indicated in configuration.
+    * Start at the oldest file and stop when you find the first file that
+    * meets compaction criteria:
+    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
+    * OR
+    * (2) within the compactRatio of sum(newer_files)
+    * Given normal skew, any newer files will also meet this criteria
+    * <p/>
+    * Additional Note:
+    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+    * compact().  Consider the oldest files first to avoid a
+    * situation where we always compact [end-threshold,end).  Then, the
+    * last file becomes an aggregate of the previous compactions.
+    *
+    * normal skew:
+    *
+    *         older ----> newer (increasing seqID)
+    *     _
+    *    | |   _
+    *    | |  | |   _
+    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+    *    | |  | |  | |  | |  _  | |
+    *    | |  | |  | |  | | | | | |
+    *    | |  | |  | |  | | | | | |
+    */
+  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+    if (candidates.getFilesToCompact().isEmpty()) {
+      return candidates;
+    }
+
+    // we're doing a minor compaction, let's see what files are applicable
+    int start = 0;
+    double ratio = comConf.getCompactionRatio();
+    if (isOffPeakHour() && candidates.trySetOffpeak()) {
+      ratio = comConf.getCompactionRatioOffPeak();
+      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+          + ", numOutstandingOffPeakCompactions is now "
+          + CompactSelection.getNumOutStandingOffPeakCompactions());
+    }
+
+    // get store file sizes for incremental compacting selection.
+    int countOfFiles = candidates.getFilesToCompact().size();
+    long[] fileSizes = new long[countOfFiles];
+    long[] sumSize = new long[countOfFiles];
+    for (int i = countOfFiles - 1; i >= 0; --i) {
+      StoreFile file = candidates.getFilesToCompact().get(i);
+      fileSizes[i] = file.getReader().length();
+      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
+      sumSize[i] = fileSizes[i]
+        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
+        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+    }
+
+
+    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
+      fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) {
+      ++start;
+    }
+    if (start < countOfFiles) {
+      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+        + " files from " + countOfFiles + " candidates");
+    }
+
+    candidates = candidates.getSubList(start, countOfFiles);
+
+    return candidates;
+  }
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
+    boolean result = false;
+    long mcTime = getNextMajorCompactTime();
+    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+      return result;
+    }
+    // TODO: Use better method for determining stamp of last major (HBASE-2990)
+    long lowTimestamp = getLowestTimestamp(filesToCompact);
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+      // Major compaction time has elapsed.
+      if (filesToCompact.size() == 1) {
+        // Single file
+        StoreFile sf = filesToCompact.get(0);
+        long oldest =
+            (sf.getReader().timeRangeTracker == null) ?
+                Long.MIN_VALUE :
+                now - sf.getReader().timeRangeTracker.minimumTimestamp;
+        if (sf.isMajorCompaction() &&
+            (store.getTtl() == HConstants.FOREVER || oldest < store.getTtl())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping major compaction of " + this +
+                " because one (major) compacted file only and oldestTime " +
+                oldest + "ms is < ttl=" + store.getTtl());
+          }
+        } else if (store.getTtl() != HConstants.FOREVER && oldest > store.getTtl()) {
+          LOG.debug("Major compaction triggered on store " + this +
+            ", because keyvalues outdated; time since last major compaction " +
+            (now - lowTimestamp) + "ms");
+          result = true;
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store " + this +
+              "; time since last major compaction " + (now - lowTimestamp) + "ms");
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  long getNextMajorCompactTime() {
+    // default = 24hrs
+    long ret = comConf.getMajorCompactionPeriod();
+    String strCompactionTime = store.getFamily().getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+    if (strCompactionTime != null) {
+      ret = (new Long(strCompactionTime)).longValue();
+    }
+
+    if (ret > 0) {
+      // default = 20% = +/- 4.8 hrs
+      double jitterPct = comConf.getMajorCompactionJitter();
+      if (jitterPct > 0) {
+        long jitter = Math.round(ret * jitterPct);
+        // deterministic jitter avoids a major compaction storm on restart
+        Integer seed = store.getDeterministicRandomSeed();
+        if (seed != null) {
+          double rnd = (new Random(seed)).nextDouble();
+          ret += jitter - Math.round(2L * jitter * rnd);
+        } else {
+          ret = 0; // no storefiles == no major compaction
+        }
+      }
+    }
+    return ret;
+  }
+
+  /*
+   * Gets lowest timestamp from candidate StoreFiles
+   *
+   * @param fs
+   * @param dir
+   * @throws IOException
+   */
+  static long getLowestTimestamp(final List<StoreFile> candidates)
+    throws IOException {
+    long minTs = Long.MAX_VALUE;
+    for (StoreFile storeFile : candidates) {
+      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
+    }
+    return minTs;
+  }
+
+  /**
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
+   */
+  boolean throttleCompaction(long compactionSize) {
+    return compactionSize > comConf.getThrottlePoint();
+  }
+
+  /**
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
+   */
+  boolean needsCompaction(int numCandidates) {
+    return numCandidates > comConf.getMinFilesToCompact();
+  }
+
+  /**
+   * @return whether this is off-peak hour
+   */
+  private boolean isOffPeakHour() {
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    int startHour = comConf.getOffPeakStartHour();
+    int endHour = comConf.getOffPeakEndHour();
+    // If offpeak time checking is disabled just return false.
+    if (startHour == endHour) {
+      return false;
+    }
+    if (startHour < endHour) {
+      return (currentHour >= startHour && currentHour < endHour);
+    }
+    return (currentHour >= startHour || currentHour < endHour);
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Tue Oct 30 20:14:01 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -64,11 +65,15 @@ class Compactor extends Configured {
       final Collection<StoreFile> filesToCompact,
       final boolean majorCompaction, final long maxId)
   throws IOException {
-    // Calculate maximum key count after compaction (for blooms)
+    // Calculate maximum key count after compaction (for blooms), and minFlushTime after compaction
     // Also calculate earliest put timestamp if major compaction
     int maxKeyCount = 0;
+    long minFlushTime = Long.MAX_VALUE;
     long earliestPutTs = HConstants.LATEST_TIMESTAMP;
     for (StoreFile file: filesToCompact) {
+      if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) {
+        minFlushTime = file.getMinFlushTime();
+      }
       StoreFile.Reader r = file.getReader();
       if (r == null) {
         LOG.warn("Null reader for " + file.getPath());
@@ -194,6 +199,10 @@ class Compactor extends Configured {
       }
     } finally {
       if (writer != null) {
+        if (minFlushTime == Long.MAX_VALUE) {
+          minFlushTime = StoreFile.NO_MIN_FLUSH_TIME;
+        }
+        writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime));
         writer.appendMetadata(maxId, majorCompaction);
         writer.close();
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 30 20:14:01 2012
@@ -4176,7 +4176,7 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * @return True if needs a mojor compaction.
+   * @return True if needs a major compaction.
    * @throws IOException
    */
   boolean isMajorCompaction() throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Oct 30 20:14:01 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -63,9 +64,10 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.CompactionManager;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -77,8 +79,6 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -108,21 +108,24 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 public class HStore extends SchemaConfigured implements Store {
   static final Log LOG = LogFactory.getLog(HStore.class);
+  
+  /** Parameter name for what compaction manager to use. */
+  private static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class";
+
+  /** Default compaction manager class name. */
+  private static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
 
   protected final MemStore memstore;
   // This stores directory in the filesystem.
   private final Path homedir;
   private final HRegion region;
   private final HColumnDescriptor family;
+  CompactionManager compactionManager;
   final FileSystem fs;
   final Configuration conf;
   final CacheConfig cacheConf;
-  // ttl in milliseconds.
+  // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
   private long ttl;
-  private final int minFilesToCompact;
-  private final int maxFilesToCompact;
-  private final long minCompactSize;
-  private final long maxCompactSize;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -197,7 +200,7 @@ public class HStore extends SchemaConfig
 
     this.comparator = info.getComparator();
     // Get TTL
-    this.ttl = getTTL(family);
+    this.ttl = determineTTLFromFamily(family);
     // used by ScanQueryMatcher
     long timeToPurgeDeletes =
         Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@@ -208,23 +211,11 @@ public class HStore extends SchemaConfig
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
 
-    // By default, compact if storefile.count >= minFilesToCompact
-    this.minFilesToCompact = Math.max(2,
-      conf.getInt("hbase.hstore.compaction.min",
-        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
-    LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
-
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
     this.blockingStoreFileCount =
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
 
-    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
-    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
-      this.region.memstoreFlushSize);
-    this.maxCompactSize
-      = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
     if (HStore.closeCheckInterval == 0) {
@@ -239,13 +230,53 @@ public class HStore extends SchemaConfig
     this.bytesPerChecksum = getBytesPerChecksum(conf);
     // Create a compaction tool instance
     this.compactor = new Compactor(this.conf);
+
+    setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
+  }
+
+  /**
+   * This setter is used for unit testing
+   * TODO: Fix this for online configuration updating
+   */
+  void setCompactionPolicy(String managerClassName) {
+    try {
+      Class<? extends CompactionManager> managerClass =
+        (Class<? extends CompactionManager>) Class.forName(managerClassName);
+      compactionManager = managerClass.getDeclaredConstructor(
+          new Class[] {Configuration.class, Store.class } ).newInstance(
+          new Object[] { conf, this } );
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find region server interface " + managerClassName, e);
+    } catch (IllegalAccessException e) {
+      throw new UnsupportedOperationException(
+          "Unable to access specified class " + managerClassName, e);
+    } catch (InstantiationException e) {
+      throw new UnsupportedOperationException(
+          "Unable to instantiate specified class " + managerClassName, e);
+    } catch (InvocationTargetException e) {
+      throw new UnsupportedOperationException(
+          "Unable to invoke specified target class constructor " + managerClassName, e);
+    } catch (NoSuchMethodException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find suitable constructor for class " + managerClassName, e);
+    }
   }
 
+  @Override
+  public Integer getDeterministicRandomSeed() {
+    ImmutableList<StoreFile> snapshot = storefiles;
+    if (snapshot != null && !snapshot.isEmpty()) {
+      return snapshot.get(0).getPath().getName().hashCode();
+    }
+    return null;
+   }
+
   /**
    * @param family
    * @return
    */
-  long getTTL(final HColumnDescriptor family) {
+  private static long determineTTLFromFamily(final HColumnDescriptor family) {
     // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     long ttl = family.getTimeToLive();
     if (ttl == HConstants.FOREVER) {
@@ -280,6 +311,11 @@ public class HStore extends SchemaConfig
     return this.fs;
   }
 
+  public long getTtl() {
+    // TTL only applies if there's no MIN_VERSIONs setting on the column.
+    return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
+  }
+
   /**
    * Returns the configured bytesPerChecksum value.
    * @param conf The configuration
@@ -771,8 +807,11 @@ public class HStore extends SchemaConfig
           } while (hasMore);
         } finally {
           // Write out the log sequence number that corresponds to this output
-          // hfile.  The hfile is current up to and including logCacheFlushId.
+          // hfile. Also write current time in metadata as minFlushTime.
+          // The hfile is current up to and including logCacheFlushId.
           status.setStatus("Flushing " + this + ": appending metadata");
+          writer.appendFileInfo(StoreFile.MIN_FLUSH_TIME, 
+              Bytes.toBytes(EnvironmentEdgeManager.currentTimeMillis()));
           writer.appendMetadata(logCacheFlushId, false);
           status.setStatus("Flushing " + this + ": closing flushed file");
           writer.close();
@@ -1014,12 +1053,12 @@ public class HStore extends SchemaConfig
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this + " of "
-        + this.region.getRegionInfo().getRegionNameAsString()
+        + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
+    long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
       StoreFile.Writer writer =
         this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@@ -1048,8 +1087,11 @@ public class HStore extends SchemaConfig
         (sf == null ? "none" : sf.getPath().getName()) +
         ", size=" + (sf == null ? "none" :
           StringUtils.humanReadableInt(sf.getReader().length()))
-        + "; total size for store is "
-        + StringUtils.humanReadableInt(storeSize));
+        + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
+        + ". This selection was in queue for "
+        + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
+        + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), compactionStartTime)
+        + " to execute.");
     return sf;
   }
 
@@ -1107,11 +1149,8 @@ public class HStore extends SchemaConfig
     return hasReferences(this.storefiles);
   }
 
-  /*
-   * @param files
-   * @return True if any of the files in <code>files</code> are References.
-   */
-  private boolean hasReferences(Collection<StoreFile> files) {
+  @Override
+  public boolean hasReferences(Collection<StoreFile> files) {
     if (files != null && files.size() > 0) {
       for (StoreFile hsf: files) {
         if (hsf.isReference()) {
@@ -1122,22 +1161,6 @@ public class HStore extends SchemaConfig
     return false;
   }
 
-  /*
-   * Gets lowest timestamp from candidate StoreFiles
-   *
-   * @param fs
-   * @param dir
-   * @throws IOException
-   */
-  public static long getLowestTimestamp(final List<StoreFile> candidates)
-      throws IOException {
-    long minTs = Long.MAX_VALUE;
-    for (StoreFile storeFile : candidates) {
-      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
-    }
-    return minTs;
-  }
-
   @Override
   public CompactionProgress getCompactionProgress() {
     return this.compactor.getProgress();
@@ -1153,91 +1176,7 @@ public class HStore extends SchemaConfig
     }
 
     List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
-
-    // exclude files above the max compaction threshold
-    // except: save all references. we MUST compact them
-    int pos = 0;
-    while (pos < candidates.size() &&
-           candidates.get(pos).getReader().length() > this.maxCompactSize &&
-           !candidates.get(pos).isReference()) ++pos;
-    candidates.subList(0, pos).clear();
-
-    return isMajorCompaction(candidates);
-  }
-
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
-   */
-  private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime();
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      if (filesToCompact.size() == 1) {
-        // Single file
-        StoreFile sf = filesToCompact.get(0);
-        long oldest =
-            (sf.getReader().timeRangeTracker == null) ?
-                Long.MIN_VALUE :
-                now - sf.getReader().timeRangeTracker.minimumTimestamp;
-        if (sf.isMajorCompaction() &&
-            (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
-                " because one (major) compacted file only and oldestTime " +
-                oldest + "ms is < ttl=" + this.ttl);
-          }
-        } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
-          LOG.debug("Major compaction triggered on store " + this +
-            ", because keyvalues outdated; time since last major compaction " +
-            (now - lowTimestamp) + "ms");
-          result = true;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
-              "; time since last major compaction " + (now - lowTimestamp) + "ms");
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
-  long getNextMajorCompactTime() {
-    // default = 24hrs
-    long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
-    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
-      String strCompactionTime =
-        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
-      ret = (new Long(strCompactionTime)).longValue();
-    }
-
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
-          0.20F);
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        ImmutableList<StoreFile> snapshot = storefiles;
-        if (snapshot != null && !snapshot.isEmpty()) {
-          String seed = snapshot.get(0).getPath().getName();
-          double curRand = new Random(seed.hashCode()).nextDouble();
-          ret += jitter - Math.round(2L * jitter * curRand);
-        } else {
-          ret = 0; // no storefiles == no major compaction
-        }
-      }
-    }
-    return ret;
+    return compactionManager.isMajorCompaction(candidates);
   }
 
   public CompactionRequest requestCompaction() throws IOException {
@@ -1273,9 +1212,10 @@ public class HStore extends SchemaConfig
         CompactSelection filesToCompact;
         if (override) {
           // coprocessor is overriding normal file selection
-          filesToCompact = new CompactSelection(conf, candidates);
+          filesToCompact = new CompactSelection(candidates);
         } else {
-          filesToCompact = compactSelection(candidates, priority);
+          filesToCompact = compactionManager.selectCompaction(candidates, priority,
+              forceMajor && filesCompacting.isEmpty());
         }
 
         if (region.getCoprocessorHost() != null) {
@@ -1326,191 +1266,6 @@ public class HStore extends SchemaConfig
   }
 
   /**
-   * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
-   * @param candidates
-   * @return
-   * @throws IOException
-   */
-  CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
-    return compactSelection(candidates,Store.NO_PRIORITY);
-  }
-
-  /**
-   * Algorithm to choose which files to compact
-   *
-   * Configuration knobs:
-   *  "hbase.hstore.compaction.ratio"
-   *    normal case: minor compact when file <= sum(smaller_files) * ratio
-   *  "hbase.hstore.compaction.min.size"
-   *    unconditionally compact individual files below this size
-   *  "hbase.hstore.compaction.max.size"
-   *    never compact individual files above this size (unless splitting)
-   *  "hbase.hstore.compaction.min"
-   *    min files needed to minor compact
-   *  "hbase.hstore.compaction.max"
-   *    max files to compact at once (avoids OOM)
-   *
-   * @param candidates candidate files, ordered from oldest to newest
-   * @return subset copy of candidate list that meets compaction criteria
-   * @throws IOException
-   */
-  CompactSelection compactSelection(List<StoreFile> candidates, int priority)
-      throws IOException {
-    // ASSUMPTION!!! filesCompacting is locked when calling this function
-
-    /* normal skew:
-     *
-     *         older ----> newer
-     *     _
-     *    | |   _
-     *    | |  | |   _
-     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-     *    | |  | |  | |  | |  _  | |
-     *    | |  | |  | |  | | | | | |
-     *    | |  | |  | |  | | | | | |
-     */
-    CompactSelection compactSelection = new CompactSelection(conf, candidates);
-
-    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
-    if (!forcemajor) {
-      // Delete the expired store files before the compaction selection.
-      if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
-          && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
-        CompactSelection expiredSelection = compactSelection
-            .selectExpiredStoreFilesToCompact(
-                EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
-
-        // If there is any expired store files, delete them  by compaction.
-        if (expiredSelection != null) {
-          return expiredSelection;
-        }
-      }
-      // do not compact old files above a configurable threshold
-      // save all references. we MUST compact them
-      int pos = 0;
-      while (pos < compactSelection.getFilesToCompact().size() &&
-             compactSelection.getFilesToCompact().get(pos).getReader().length()
-               > maxCompactSize &&
-             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      if (pos != 0) compactSelection.clearSubList(0, pos);
-    }
-
-    if (compactSelection.getFilesToCompact().isEmpty()) {
-      LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-        this + ": no store files to compact");
-      compactSelection.emptyFileList();
-      return compactSelection;
-    }
-
-    // Force a major compaction if this is a user-requested major compaction,
-    // or if we do not have too many files to compact and this was requested
-    // as a major compaction
-    boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
-      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
-      (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
-    );
-    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-      this.getColumnFamilyName() + ": Initiating " +
-      (majorcompaction ? "major" : "minor") + "compaction");
-
-    if (!majorcompaction &&
-        !hasReferences(compactSelection.getFilesToCompact())) {
-      // we're doing a minor compaction, let's see what files are applicable
-      int start = 0;
-      double r = compactSelection.getCompactSelectionRatio();
-
-      // remove bulk import files that request to be excluded from minors
-      compactSelection.getFilesToCompact().removeAll(Collections2.filter(
-          compactSelection.getFilesToCompact(),
-          new Predicate<StoreFile>() {
-            public boolean apply(StoreFile input) {
-              return input.excludeFromMinorCompaction();
-            }
-          }));
-
-      // skip selection algorithm if we don't have enough files
-      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Not compacting files because we only have " +
-            compactSelection.getFilesToCompact().size() +
-            " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
-        }
-        compactSelection.emptyFileList();
-        return compactSelection;
-      }
-
-      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
-      // Sort files by size to correct when normal skew is altered by bulk load.
-      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
-       */
-
-      // get store file sizes for incremental compacting selection.
-      int countOfFiles = compactSelection.getFilesToCompact().size();
-      long [] fileSizes = new long[countOfFiles];
-      long [] sumSize = new long[countOfFiles];
-      for (int i = countOfFiles-1; i >= 0; --i) {
-        StoreFile file = compactSelection.getFilesToCompact().get(i);
-        fileSizes[i] = file.getReader().length();
-        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-        int tooFar = i + this.maxFilesToCompact - 1;
-        sumSize[i] = fileSizes[i]
-                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
-                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-      }
-
-      /* Start at the oldest file and stop when you find the first file that
-       * meets compaction criteria:
-       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
-       *      OR
-       *   (2) within the compactRatio of sum(newer_files)
-       * Given normal skew, any newer files will also meet this criteria
-       *
-       * Additional Note:
-       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-       * compact().  Consider the oldest files first to avoid a
-       * situation where we always compact [end-threshold,end).  Then, the
-       * last file becomes an aggregate of the previous compactions.
-       */
-      while(countOfFiles - start >= this.minFilesToCompact &&
-            fileSizes[start] >
-              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
-        ++start;
-      }
-      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
-      long totalSize = fileSizes[start]
-                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
-      compactSelection = compactSelection.getSubList(start, end);
-
-      // if we don't have enough files to compact, just wait
-      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this
-            + ".  Only " + (end - start) + " file(s) of size "
-            + StringUtils.humanReadableInt(totalSize)
-            + " have met compaction criteria.");
-        }
-        compactSelection.emptyFileList();
-        return compactSelection;
-      }
-    } else {
-      if(majorcompaction) {
-        if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
-          LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
-            " files, probably because of a user-requested major compaction");
-          if(priority != Store.PRIORITY_USER) {
-            LOG.error("Compacting more than max files on a non user-requested compaction");
-          }
-        }
-      } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
-        // all files included in this compaction, up to max
-        int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
-        compactSelection.getFilesToCompact().subList(0, pastMax).clear();
-      }
-    }
-    return compactSelection;
-  }
-
-  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -2017,11 +1772,7 @@ public class HStore extends SchemaConfig
 
   @Override
   public boolean throttleCompaction(long compactionSize) {
-    // see HBASE-5867 for discussion on the default
-    long throttlePoint = conf.getLong(
-        "hbase.regionserver.thread.compaction.throttle",
-        2 * this.minFilesToCompact * this.region.memstoreFlushSize);
-    return compactionSize > throttlePoint;
+    return compactionManager.throttleCompaction(compactionSize);
   }
 
   @Override
@@ -2116,7 +1867,7 @@ public class HStore extends SchemaConfig
 
   @Override
   public boolean needsCompaction() {
-    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
+    return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size());
   }
 
   @Override
@@ -2126,8 +1877,8 @@ public class HStore extends SchemaConfig
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (18 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+          + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -2149,6 +1900,15 @@ public class HStore extends SchemaConfig
   }
 
   /**
+   * Refreshes compaction manager class configuration. 
+   * Used for tests only - not plumbed thru any layers.
+   * TODO: replace when HBASE-3909 is in.
+   */
+  void updateConfiguration() {
+    setCompactionPolicy(conf.get(COMPACTION_MANAGER_CLASS, DEFAULT_COMPACTION_MANAGER_CLASS));
+  }
+
+  /**
    * Immutable information for scans over a store.
    */
   public static class ScanInfo {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 30 20:14:01 2012
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -204,6 +206,12 @@ public interface Store extends SchemaAwa
    * @return <tt>true</tt> if the store has any underlying reference files to older HFiles
    */
   public boolean hasReferences();
+  
+  /*
+   * @param files
+   * @return True if any of the files in <code>files</code> are References.
+   */
+  public boolean hasReferences(Collection<StoreFile> files);
 
   /**
    * @return The size of this store's memstore, in bytes
@@ -267,6 +275,11 @@ public interface Store extends SchemaAwa
    * @return the total size of all Bloom filters in the store
    */
   public long getTotalStaticBloomSize();
+  
+  /**
+   * Returns the TTL for this store's column family.
+   */
+  public long getTtl(); 
 
   // Test-helper methods
 
@@ -287,4 +300,10 @@ public interface Store extends SchemaAwa
    * @return the parent region hosting this store
    */
   public HRegion getHRegion();
+  
+  /**
+   * @return A hash code depending on the state of the current store files.
+   * This is used as seed for deterministic random generator for selecting major compaction time
+   */
+  public Integer getDeterministicRandomSeed();
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 30 20:14:01 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.util.Bloo
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
@@ -114,6 +115,9 @@ public class StoreFile extends SchemaCon
   /** Max Sequence ID in FileInfo */
   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
 
+  /** Min Flush time in FileInfo */
+  public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME");
+
   /** Major compaction flag in FileInfo */
   public static final byte[] MAJOR_COMPACTION_KEY =
       Bytes.toBytes("MAJOR_COMPACTION_KEY");
@@ -143,6 +147,9 @@ public class StoreFile extends SchemaCon
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
 
+  /** Default value for files without minFlushTime in metadata */
+  public static final long NO_MIN_FLUSH_TIME = -1;
+
   private final FileSystem fs;
 
   // This file's path.
@@ -169,6 +176,8 @@ public class StoreFile extends SchemaCon
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
+  // default value is -1, remains -1 if file written without minFlushTime
+  private long minFlushTime = NO_MIN_FLUSH_TIME;
 
   // max of the MemstoreTS in the KV's in this store
   // Set when we obtain a Reader.
@@ -381,6 +390,22 @@ public class StoreFile extends SchemaCon
     return this.sequenceid;
   }
 
+  public boolean hasMinFlushTime() {
+    return this.minFlushTime != NO_MIN_FLUSH_TIME;
+  }
+
+  public long getMinFlushTime() {
+      // BulkLoad files are assumed to contain very old data, return 0
+      if (isBulkLoadResult() && getMaxSequenceId() <= 0) {
+        return 0;
+      } else if (this.minFlushTime == NO_MIN_FLUSH_TIME) {
+          // File written without minFlushTime field assume recent data
+          return EnvironmentEdgeManager.currentTimeMillis();
+      } else {
+        return this.minFlushTime;
+      }
+  }
+
   public long getModificationTimeStamp() {
     return modificationTimeStamp;
   }
@@ -587,7 +612,10 @@ public class StoreFile extends SchemaCon
         }
       }
     }
-
+    b = metadataMap.get(MIN_FLUSH_TIME);
+    if (b != null) {
+        this.minFlushTime = Bytes.toLong(b);
+    }
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java?rev=1403852&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java Tue Oct 30 20:14:01 2012
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import java.text.DecimalFormat;
+
+/**
+ * Control knobs for default compaction algorithm
+ */
+@InterfaceAudience.Private
+public class TierCompactionConfiguration extends CompactionConfiguration {
+
+  private CompactionTier[] compactionTier;
+  private boolean recentFirstOrder;
+
+  TierCompactionConfiguration(Configuration conf, Store store) {
+    super(conf, store);
+
+    String strPrefix = "hbase.hstore.compaction.";
+    String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+                       + "cf." + store.getFamily().getNameAsString() + ".";
+    String strDefault = "Default.";
+    String strAttribute;
+    // If value not set for family, use default family (by passing null).
+    // If default value not set, use 1 tier.
+
+    strAttribute = "NumCompactionTiers";
+    compactionTier = new CompactionTier[
+      conf.getInt(strPrefix + strSchema  + strAttribute,
+      conf.getInt(strPrefix + strDefault + strAttribute,
+      1))];
+
+    strAttribute = "IsRecentFirstOrder";
+    recentFirstOrder =
+      conf.getBoolean(strPrefix + strSchema  + strAttribute,
+      conf.getBoolean(strPrefix + strDefault + strAttribute,
+      true));
+
+    strAttribute = "MinCompactSize";
+    minCompactSize =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      0));
+
+    strAttribute = "MaxCompactSize";
+    maxCompactSize =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      Long.MAX_VALUE));
+
+    strAttribute = "ShouldDeleteExpired";
+    shouldDeleteExpired =
+      conf.getBoolean(strPrefix + strSchema  + strAttribute,
+      conf.getBoolean(strPrefix + strDefault + strAttribute,
+      shouldDeleteExpired));
+
+    strAttribute = "ThrottlePoint";
+    throttlePoint =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      throttlePoint));
+
+    strAttribute = "MajorCompactionPeriod";
+    majorCompactionPeriod =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      majorCompactionPeriod));
+
+    strAttribute = "MajorCompactionJitter";
+    majorCompactionJitter =
+      conf.getFloat(
+          strPrefix + strSchema + strAttribute,
+          conf.getFloat(
+              strPrefix + strDefault + strAttribute,
+              majorCompactionJitter
+          )
+      );
+
+    for (int i = 0; i < compactionTier.length; i++) {
+      compactionTier[i] = new CompactionTier(i);
+    }
+  }
+  /**
+   * @return Number of compaction Tiers
+   */
+  int getNumCompactionTiers() {
+    return compactionTier.length;
+  }
+
+  /**
+   * @return The i-th tier from most recent
+   */
+  CompactionTier getCompactionTier(int i) {
+    return compactionTier[i];
+  }
+
+  /**
+   * @return Whether the tiers will be checked for compaction from newest to oldest
+   */
+  boolean isRecentFirstOrder() {
+    return recentFirstOrder;
+  }
+
+  /**
+   * Parameters for each tier
+   */
+  class CompactionTier {
+
+    private long maxAgeInDisk;
+    private long maxSize;
+    private double tierCompactionRatio;
+    private int tierMinFilesToCompact;
+    private int tierMaxFilesToCompact;
+    private int endingIndexForTier;
+
+    CompactionTier(int tier) {
+      String strPrefix = "hbase.hstore.compaction.";
+      String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+                         + "cf." + store.getFamily().getNameAsString() + ".";
+      String strDefault = "Default.";
+      String strDefTier = "";
+      String strTier = "Tier." + String.valueOf(tier) + ".";
+      String strAttribute;
+
+      /**
+       * Use value set for current family, current tier
+       * If not set, use value set for current family, default tier
+       * if not set, use value set for Default family, current tier
+       * If not set, use value set for Default family, default tier
+       * Else just use a default value
+       */
+
+      strAttribute = "MaxAgeInDisk";
+      maxAgeInDisk =
+        conf.getLong(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+        Long.MAX_VALUE));
+
+      strAttribute = "MaxSize";
+      maxSize =
+        conf.getLong(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+        Long.MAX_VALUE));
+
+      strAttribute = "CompactionRatio";
+      tierCompactionRatio = (double)
+        conf.getFloat(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getFloat(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getFloat(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getFloat(strPrefix + strDefault + strDefTier + strAttribute,
+        (float) compactionRatio))));
+
+      strAttribute = "MinFilesToCompact";
+      tierMinFilesToCompact =
+        conf.getInt(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getInt(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+        minFilesToCompact))));
+
+      strAttribute = "MaxFilesToCompact";
+      tierMaxFilesToCompact =
+        conf.getInt(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getInt(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+        maxFilesToCompact))));
+
+      strAttribute = "EndingIndexForTier";
+      endingIndexForTier =
+        conf.getInt(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier + strAttribute,
+        tier));
+
+      //make sure this value is not incorrectly set
+      if (endingIndexForTier < 0 || endingIndexForTier > tier) {
+        LOG.error("EndingIndexForTier improperly set. Using default value.");
+        endingIndexForTier = tier;
+      }
+
+    }
+
+    /**
+     * @return Upper bound on storeFile's minFlushTime to be included in this tier
+     */
+    long getMaxAgeInDisk() {
+      return maxAgeInDisk;
+    }
+
+    /**
+     * @return Upper bound on storeFile's size to be included in this tier
+     */
+    long getMaxSize() {
+      return maxSize;
+    }
+
+    /**
+     * @return Compaction ratio for selections of this tier
+     */
+    double getCompactionRatio() {
+      return tierCompactionRatio;
+    }
+
+    /**
+     * @return lower bound on number of files in selections of this tier
+     */
+    int getMinFilesToCompact() {
+      return tierMinFilesToCompact;
+    }
+
+    /**
+     * @return upper bound on number of files in selections of this tier
+     */
+    int getMaxFilesToCompact() {
+      return tierMaxFilesToCompact;
+    }
+
+    /**
+     * @return the newest tier which will also be included in selections of this tier
+     *  by default it is the index of this tier, must be between 0 and this tier
+     */
+    int getEndingIndexForTier() {
+      return endingIndexForTier;
+    }
+
+    String getDescription() {
+      String ageString = "INF";
+      String sizeString = "INF";
+      if (getMaxAgeInDisk() < Long.MAX_VALUE) {
+        ageString = StringUtils.formatTime(getMaxAgeInDisk());
+      }
+      if (getMaxSize() < Long.MAX_VALUE) {
+        ageString = StringUtils.humanReadableInt(getMaxSize());
+      }
+      String ret = "Has files upto age " + ageString
+          + " and upto size " + sizeString + ". "
+          + "Compaction ratio: " + (new DecimalFormat("#.##")).format(getCompactionRatio()) + ", "
+          + "Compaction Selection with at least " + getMinFilesToCompact() + " and "
+          + "at most " + getMaxFilesToCompact() + " files possible, "
+          + "Selections in this tier includes files up to tier " + getEndingIndexForTier();
+      return ret;
+    }
+
+  }
+
+}
\ No newline at end of file

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java?rev=1403852&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java Tue Oct 30 20:14:01 2012
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+public class TierCompactionManager extends CompactionManager {
+
+  private static final Log LOG = LogFactory.getLog(TierCompactionManager.class);
+
+  private int[] endInTier;
+  private int[] tierOf;
+
+  private TierCompactionConfiguration tierConf;
+
+  TierCompactionManager(Configuration configuration, Store store) {
+    super(configuration, store);
+    comConf = new TierCompactionConfiguration(configuration, store);
+    tierConf = (TierCompactionConfiguration) comConf;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * -- Tier Based minor compaction selection algorithm: Choose CompactSelection from candidates --
+   * <p/>
+   * First exclude bulk-load files if indicated in configuration.
+   * Arrange files from oldest to newest then select an appropriate ['start','end') pair
+   * try 'start' from oldest to newest (smallest to largest fileIndex)
+   * for each value, identify the 'end' fileIndex
+   * stop when the range ['start','end') is an admissible compaction
+   * <p/>
+   * Notes:
+   * <p/>
+   * a compaction is admissible if
+   * - file fileSize[start] is at most maxCompactSize AND
+   * - number of files is at least currentTier.minFilesToCompact AND
+   * - (fileSize[start] is at most ratio times the rest of the files in the compaction OR
+   * - fileSize[start] is at most minCompactSize)
+   * <p/>
+   * end is endInTier[tierOf[start].endingInclusionTier]
+   * By default currentTier.endingIndexForTier = currentTier, so in the default
+   * case 'end' is always 1 + the last fileIndex in currentTier, making sure
+   * files from different tiers are never selected together in the default case
+   * normal skew:
+   *
+   *         older ----> newer (increasing seqID, increasing minFlushTime)
+   *
+   * Tier 2  |  Tier 1   |  Tier 0
+   *        |          |
+   *     _  |          |
+   *    | | |  _       |
+   *    | | | | |   _  |
+   *  --|-|-|-|-|- |-|-|--_-------_-------  minCompactSize
+   *    | | | | |  | | | | |  _  | |
+   *    | | | | |  | | | | | | | | |
+   *    | | | | |  | | | | | | | | |
+   */
+  @Override
+  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+    // we're doing a minor compaction, let's see what files are applicable
+    int start = -1;
+    int end = -1;
+
+    // skip selection algorithm if we don't have enough files
+    if (candidates.getFilesToCompact().isEmpty()) {
+      candidates.emptyFileList();
+      return candidates;
+    }
+
+    // get store file sizes for incremental compacting selection.
+    int countOfFiles = candidates.getFilesToCompact().size();
+    long[] fileSizes = new long[countOfFiles];
+    StoreFile file;
+    long[] sumSize = new long[countOfFiles + 1];
+    sumSize[countOfFiles] = 0;
+    for (int i = countOfFiles - 1; i >= 0; --i) {
+      file = candidates.getFilesToCompact().get(i);
+      fileSizes[i] = file.getReader().length();
+      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+      sumSize[i] = fileSizes[i] + sumSize[i + 1];
+    }
+
+    /**
+     * divide into tiers:
+     * assign tierOf[fileIndex] = tierIndex
+     * assign endInTier[tierIndex] = 1 + index of the last file in tierIndex
+     */
+    // Backward compatibility - if files with indices < i don't have minFlushTime field, then
+    //    all of them get tierOf[i]. If no file has minFlushTime all gets tier zero.
+    int numTiers = tierConf.getNumCompactionTiers();
+    TierCompactionConfiguration.CompactionTier tier;
+    tierOf = new int[countOfFiles];
+    endInTier = new int[numTiers + 1];
+    endInTier[numTiers] = 0;
+
+    LOG.info("Applying TierCompactionPolicy with " + countOfFiles + " files");
+
+    int i;
+    int j = countOfFiles;
+
+    for (i = 0; i < numTiers; i++) {
+      tier = tierConf.getCompactionTier(i);
+      endInTier[i] = j;
+      while (j > 0) {
+        file = candidates.getFilesToCompact().get(j - 1);
+        if (!isInTier(file, tier)) {
+          break;
+        }
+        j--;
+        tierOf[j] = i;
+      }
+    }
+
+    long restSize;
+    double ratio;
+
+    //Main algorithm
+    for (j = 0; j < countOfFiles; j++) {
+      start = next(start);
+      tier = tierConf.getCompactionTier(tierOf[start]);
+      end = endInTier[tier.getEndingIndexForTier()];
+      restSize = sumSize[start + 1] - sumSize[end];
+      ratio = tier.getCompactionRatio();
+      if (fileSizes[start] <= tierConf.getMaxCompactSize() &&
+        end - start >= tier.getMinFilesToCompact() &&
+        (fileSizes[start] <= tierConf.getMinCompactSize() ||
+          (fileSizes[start] <= restSize * ratio))) {
+        break;
+      }
+    }
+    String tab = "    ";
+    for (i = 0; i < numTiers; i++) {
+      LOG.info("Tier " + i + " : " + tierConf.getCompactionTier(i).getDescription());
+      if (endInTier[i] == endInTier[i+1]) {
+        LOG.info(tab + "No file is assigned to this tier.");
+      } else {
+        LOG.info(tab + (endInTier[i] - endInTier[i+1])
+          + " file(s) are assigned to this tier with serial number(s) "
+          + endInTier[i + 1] + " to " + (endInTier[i] - 1));
+      }
+      for (j = endInTier[i + 1]; j < endInTier[i]; j++) {
+        file = candidates.getFilesToCompact().get(j);
+        LOG.info(tab + tab + "SeqID = " + file.getMaxSequenceId()
+          + ", Age = " + StringUtils.formatTimeDiff(
+              EnvironmentEdgeManager.currentTimeMillis(), file.getMinFlushTime())
+          + ", Size = " + StringUtils.humanReadableInt(fileSizes[j])
+          + ", Path = " + file.getPath());
+      }
+    }
+    if (start < countOfFiles) {
+      end = Math.min(end, start
+        + tierConf.getCompactionTier(tierOf[start]).getMaxFilesToCompact());
+    }
+    if (start < end) {
+      String strTier = String.valueOf(tierOf[start]);
+      if (tierOf[end - 1] != tierOf[start]) {
+        strTier += " to " + tierOf[end - 1];
+      }
+      LOG.info("Tier Based compaction algorithm has selected " + (end - start)
+        + " files from tier " + strTier + " out of " + countOfFiles + " candidates");
+    }
+
+    candidates = candidates.getSubList(start, end);
+    return candidates;
+  }
+
+  private boolean isInTier(StoreFile file, TierCompactionConfiguration.CompactionTier tier) {
+    return file.getReader().length() <= tier.getMaxSize() &&
+      EnvironmentEdgeManager.currentTimeMillis()-file.getMinFlushTime() <= tier.getMaxAgeInDisk();
+  }
+
+  /**
+   * This function iterates over the start values in order.
+   * Whenever an admissible compaction is found, we return the selection.
+   * Hence the order is important if there are more than one admissible compaction.
+   * @param start current Value
+   * @return next Value
+   */
+  private int next(int start) {
+    if (tierConf.isRecentFirstOrder()) {
+      return backNext(start);
+    }
+    return fwdNext(start);
+  }
+
+  /**
+   * This function iterates over the start values in newer-first order of tiers,
+   * but older-first order of files within a tier.
+   * For example, suppose the tiers are:
+   * Tier 3 - files 0,1,2
+   * Tier 2 - files 3,4
+   * Tier 1 - no files
+   * Tier 0 - files 5,6,7
+   * Then the order of 'start' files will be:
+   * 5,6,7,3,4,0,1,2
+   * @param start current Value
+   * @return next Value
+   */
+  private int backNext(int start) {
+    int tier = 0;
+    if (start == -1) {
+      while (endInTier[tier] >= endInTier[0]) {
+        tier++;
+      }
+      return endInTier[tier];
+    }
+    tier = tierOf[start];
+    if (endInTier[tier] == start + 1) {
+      tier++;
+      start = endInTier[tier];
+      while (endInTier[tier] >= start) {
+        tier++;
+      }
+      return endInTier[tier];
+    }
+    return start + 1;
+  }
+
+  /**
+   * This function iterates over the start values in older-first order of files.
+   * @param start current Value
+   * @return next Value
+   */
+  private int fwdNext(int start) {
+    return start + 1;
+  }
+
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Tue Oct 30 20:14:01 2012
@@ -19,15 +19,13 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 @InterfaceAudience.Private
 public class CompactSelection {
@@ -48,37 +46,15 @@ public class CompactSelection {
    */
   private final static Object compactionCountLock = new Object();
 
-  // HBase conf object
-  Configuration conf;
   // was this compaction promoted to an off-peak
   boolean isOffPeakCompaction = false;
-  // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
-  // With float, java will downcast your long to float for comparisons (bad)
-  private double compactRatio;
-  // compaction ratio off-peak
-  private double compactRatioOffPeak;
-  // offpeak start time
-  private int offPeakStartHour = -1;
-  // off peak end time
-  private int offPeakEndHour = -1;
+  // CompactSelection object creation time.
+  private final long selectionTime;
 
-  public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
+  public CompactSelection(List<StoreFile> filesToCompact) {
+    this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
     this.filesToCompact = filesToCompact;
-    this.conf = conf;
-    this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
-    this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
-
-    // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
-    this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
-    this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
-    if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
-      if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
-        LOG.warn("Invalid start/end hour for peak hour : start = " +
-            this.offPeakStartHour + " end = " + this.offPeakEndHour +
-            ". Valid numbers are [0-23]");
-      }
-      this.offPeakStartHour = this.offPeakEndHour = -1;
-    }
+    this.isOffPeakCompaction = false;
   }
 
   /**
@@ -113,49 +89,25 @@ public class CompactSelection {
     }
 
     if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
+      expiredSFSelection = new CompactSelection(expiredStoreFiles);
     }
     return expiredSFSelection;
   }
 
   /**
-   * If the current hour falls in the off peak times and there are no 
-   * outstanding off peak compactions, the current compaction is 
-   * promoted to an off peak compaction. Currently only one off peak 
-   * compaction is present in the compaction queue.
-   *
-   * @param currentHour
-   * @return
-   */
-  public double getCompactSelectionRatio() {
-    double r = this.compactRatio;
-    synchronized(compactionCountLock) {
-      if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
-        r = this.compactRatioOffPeak;
-        numOutstandingOffPeakCompactions++;
-        isOffPeakCompaction = true;
-      }
-    }
-    if(isOffPeakCompaction) {
-      LOG.info("Running an off-peak compaction, selection ratio = " +
-          compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
-          numOutstandingOffPeakCompactions);
-    }
-    return r;
-  }
-
-  /**
    * The current compaction finished, so reset the off peak compactions count
    * if this was an off peak compaction.
    */
   public void finishRequest() {
     if (isOffPeakCompaction) {
+      long newValueToLog = -1;
       synchronized(compactionCountLock) {
-        numOutstandingOffPeakCompactions--;
+        assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
+        newValueToLog = --numOutstandingOffPeakCompactions;
         isOffPeakCompaction = false;
       }
       LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
-          numOutstandingOffPeakCompactions);
+          newValueToLog);
     }
   }
 
@@ -170,13 +122,14 @@ public class CompactSelection {
   public void emptyFileList() {
     filesToCompact.clear();
     if (isOffPeakCompaction) {
+      long newValueToLog = -1;
       synchronized(compactionCountLock) {
         // reset the off peak count
-        numOutstandingOffPeakCompactions--;
+        newValueToLog = --numOutstandingOffPeakCompactions;
         isOffPeakCompaction = false;
       }
       LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
-          numOutstandingOffPeakCompactions);
+          newValueToLog);
     }
   }
 
@@ -184,16 +137,30 @@ public class CompactSelection {
     return this.isOffPeakCompaction;
   }
 
-  private boolean isOffPeakHour() {
-    int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
-    // If offpeak time checking is disabled just return false.
-    if (this.offPeakStartHour == this.offPeakEndHour) {
-      return false;
+  public static long getNumOutStandingOffPeakCompactions() {
+    synchronized(compactionCountLock) {
+      return numOutstandingOffPeakCompactions;
     }
-    if (this.offPeakStartHour < this.offPeakEndHour) {
-      return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
+  }
+
+  /**
+   * Tries making the compaction off-peak.
+   * Only checks internal compaction constraints, not timing.
+   * @return Eventual value of isOffPeakCompaction.
+   */
+  public boolean trySetOffpeak() {
+    assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
+    synchronized(compactionCountLock) {
+      if (numOutstandingOffPeakCompactions == 0) {
+         numOutstandingOffPeakCompactions++;
+         isOffPeakCompaction = true;
+      }
     }
-    return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
+    return isOffPeakCompaction;
+  }
+
+  public long getSelectionTime() {
+    return selectionTime;
   }
 
   public CompactSelection subList(int start, int end) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1403852&r1=1403851&r2=1403852&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Oct 30 20:14:01 2012
@@ -208,6 +208,10 @@ public class CompactionRequest implement
       return p;
     }
 
+    public long getSelectionTime() {
+      return compactSelection.getSelectionTime();
+    }
+
     /** Gets the priority for the request */
     public void setPriority(int p) {
       this.p = p;
@@ -272,7 +276,7 @@ public class CompactionRequest implement
         server.checkFileSystem();
       } finally {
         s.finishRequest(this);
-        LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
+        LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
       }
     }
 

Added: hbase/trunk/hbase-server/src/main/resources/hbase-compactions.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/hbase-compactions.xml?rev=1403852&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hbase/trunk/hbase-server/src/main/resources/hbase-compactions.xml
------------------------------------------------------------------------------
    svn:mime-type = application/xml