You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/10/29 01:37:30 UTC

svn commit: r1536569 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase...

Author: sershe
Date: Tue Oct 29 00:37:30 2013
New Revision: 1536569

URL: http://svn.apache.org/r1536569
Log:
HBASE-7680 implement compaction policy for stripe compactions

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -54,6 +54,12 @@ public class DefaultStoreEngine extends 
     DEFAULT_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
 
   @Override
+  public boolean needsCompaction(List<StoreFile> filesCompacting) {
+    return compactionPolicy.needsCompaction(
+        this.storeFileManager.getStorefiles(), filesCompacting);
+  }
+
+  @Override
   protected void createComponents(
       Configuration conf, Store store, KVComparator kvComparator) throws IOException {
     storeFileManager = new DefaultStoreFileManager(kvComparator, conf);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Oct 29 00:37:30 2013
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1194,8 +1196,8 @@ public class HStore implements Store {
 
     try {
       // Ready to go. Have list of files to compact.
-      List<Path> newFiles =
-          this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
+      List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
+          .compactForTesting(filesToCompact, isMajor);
       for (Path newFile: newFiles) {
         // Move the compaction into place.
         StoreFile sf = moveFileIntoPlace(newFile);
@@ -1881,8 +1883,7 @@ public class HStore implements Store {
 
   @Override
   public boolean needsCompaction() {
-    return storeEngine.getCompactionPolicy().needsCompaction(
-        this.storeEngine.getStoreFileManager().getStorefiles(), filesCompacting);
+    return this.storeEngine.needsCompaction(this.filesCompacting);
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java Tue Oct 29 00:37:30 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
 @InterfaceStability.Unstable
 public interface StoreConfigInformation {
   /**
-   * TODO: remove after HBASE-7252 is fixed.
    * @return Gets the Memstore flush size for the region that this store works with.
    */
   long getMemstoreFlushSize();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Tue Oct 29 00:37:30 2013
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,12 @@ public abstract class StoreEngine<SF ext
   }
 
   /**
+   * @param filesCompacting Files currently compacting
+   * @return whether a compaction selection is possible
+   */
+  public abstract boolean needsCompaction(List<StoreFile> filesCompacting);
+
+  /**
    * Creates an instance of a compaction context specific to this engine.
    * Doesn't actually select or start a compaction. See CompactionContext class comment.
    * @return New CompactionContext object.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 29 00:37:30 2013
@@ -285,7 +285,7 @@ public class StoreFile {
     return modificationTimeStamp;
   }
 
-  byte[] getMetadataValue(byte[] key) {
+  public byte[] getMetadataValue(byte[] key) {
     return metadataMap.get(key);
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java Tue Oct 29 00:37:30 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 
@@ -28,25 +30,116 @@ import org.apache.hadoop.conf.Configurat
  */
 @InterfaceAudience.Private
 public class StripeStoreConfig {
-  public static final String MAX_SPLIT_IMBALANCE = "hbase.store.stripe.split.max.imbalance";
-  private float maxSplitImbalance;
+  static final Log LOG = LogFactory.getLog(StripeStoreConfig.class);
 
-  public StripeStoreConfig(Configuration config) {
-    maxSplitImbalance = config.getFloat(MAX_SPLIT_IMBALANCE, 1.5f);
-    if (maxSplitImbalance == 0) {
-      maxSplitImbalance = 1.5f;
+  /** The maximum number of files to compact within a stripe; same as for regular compaction. */
+  public static final String MAX_FILES_KEY = "hbase.store.stripe.compaction.maxFiles";
+  /** The minimum number of files to compact within a stripe; same as for regular compaction. */
+  public static final String MIN_FILES_KEY = "hbase.store.stripe.compaction.minFiles";
+
+  /**  The minimum number of files to compact when compacting L0; same as minFiles for regular
+   * compaction. Given that L0 causes unnecessary overwriting of the data, should be higher than
+   * regular minFiles. */
+  public static final String MIN_FILES_L0_KEY = "hbase.store.stripe.compaction.minFilesL0";
+
+  /** The size the stripe should achieve to be considered for splitting into multiple stripes.
+   Stripe will be split when it can be fully compacted, and it is above this size. */
+  public static final String SIZE_TO_SPLIT_KEY = "hbase.store.stripe.sizeToSplit";
+  /** The target count of new stripes to produce when splitting a stripe. A floating point
+   number, default is 2. Values less than 1 will be converted to 1/x. Non-whole numbers will
+   produce unbalanced splits, which may be good for some cases. In this case the "smaller" of
+   the new stripes will always be the rightmost one. If the stripe is bigger than sizeToSplit
+   when splitting, this will be adjusted by a whole increment. */
+  public static final String SPLIT_PARTS_KEY = "hbase.store.stripe.splitPartCount";
+  /** The initial stripe count to create. If the row distribution is roughly the same over time,
+   it's good to set this to a count of stripes that is expected to be achieved in most regions,
+   to get this count from the outset and prevent unnecessary splitting. */
+  public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
+
+  /** When splitting region, the maximum size imbalance to allow in an attempt to split at a
+   stripe boundary, so that no files go to both regions. Most users won't need to change that. */
+  public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
+      "hbase.store.stripe.region.split.max.imbalance";
+
+
+  private final float maxRegionSplitImbalance;
+  private final int level0CompactMinFiles;
+  private final int stripeCompactMinFiles;
+  private final int stripeCompactMaxFiles;
+
+  private final int initialCount;
+  private final long sizeToSplitAt;
+  private final float splitPartCount;
+  private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
+
+  private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
+  public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
+    this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
+    this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
+    this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
+    this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
+
+    this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
+    if (Math.abs(splitPartCount - 1.0) < EPSILON) {
+      throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
+    }
+    // TODO: change when no L0.
+    // Arbitrary default split size - 4 times the size of one L0 compaction.
+    double flushSize = sci.getMemstoreFlushSize();
+    if (flushSize == 0) {
+      flushSize = 128 * 1024 * 1024;
     }
-    if (maxSplitImbalance < 1f) {
-      maxSplitImbalance = 1f / maxSplitImbalance;
+    long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
+    this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
+    this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+    this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
+  }
+
+  private static float getFloat(
+      Configuration config, String key, float defaultValue, boolean moreThanOne) {
+    float value = config.getFloat(key, defaultValue);
+    if (value == 0) {
+      LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
+      value = defaultValue;
+    } else if ((value > 1f) != moreThanOne) {
+      value = 1f / value;
     }
+    return value;
+  }
+
+  public float getMaxSplitImbalance() {
+    return this.maxRegionSplitImbalance;
+  }
+
+  public int getLevel0MinFiles() {
+    return level0CompactMinFiles;
+  }
+
+  public int getStripeCompactMinFiles() {
+    return stripeCompactMinFiles;
+  }
+
+  public int getStripeCompactMaxFiles() {
+    return stripeCompactMaxFiles;
+  }
+
+  public long getSplitSize() {
+    return sizeToSplitAt;
+  }
+
+  public int getInitialCount() {
+    return initialCount;
+  }
+
+  public float getSplitCount() {
+    return splitPartCount;
   }
 
   /**
-   * @return the maximum imbalance to tolerate between sides when splitting the region
-   * at the stripe boundary. If the ratio of a larger to a smaller side of the split on
-   * the stripe-boundary is bigger than this, then some stripe will be split.
+   * @return the desired size of the target stripe when splitting, in bytes.
+   *         Derived from {@link #getSplitSize()} and {@link #getSplitCount()}.
    */
-  public float getMaxSplitImbalance() {
-    return this.maxSplitImbalance;
+  public long getSplitPartSize() {
+    return splitPartSize;
   }
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The storage engine that implements the stripe-based store/compaction scheme.
+ */
+@InterfaceAudience.Private
+public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
+  StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
+  static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
+  private StripeStoreConfig config;
+
+  @Override
+  public boolean needsCompaction(List<StoreFile> filesCompacting) {
+    return this.compactionPolicy.needsCompactions(this.storeFileManager, filesCompacting);
+  }
+
+  @Override
+  public CompactionContext createCompaction() {
+    return new StripeCompaction();
+  }
+
+  @Override
+  protected void createComponents(
+      Configuration conf, Store store, KVComparator comparator) throws IOException {
+    this.config = new StripeStoreConfig(conf, store);
+    this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
+    this.storeFlusher = new DefaultStoreFlusher(conf, store);
+    this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+    this.compactor = new StripeCompactor(conf, store);
+  }
+
+  /**
+   * Represents one instance of stripe compaction, with the necessary context and flow.
+   */
+  private class StripeCompaction extends CompactionContext {
+    private StripeCompactionPolicy.StripeCompactionRequest stripeRequest = null;
+
+    @Override
+    public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+      return compactionPolicy.preSelectFilesForCoprocessor(storeFileManager, filesCompacting);
+    }
+
+    @Override
+    public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
+        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+      this.stripeRequest = compactionPolicy.selectCompaction(
+          storeFileManager, filesCompacting, mayUseOffPeak);
+      this.request = (this.stripeRequest == null)
+          ? new CompactionRequest(new ArrayList<StoreFile>()) : this.stripeRequest.getRequest();
+      return this.stripeRequest != null;
+    }
+
+    @Override
+    public void forceSelect(CompactionRequest request) {
+      super.forceSelect(request);
+      if (this.stripeRequest != null) {
+        this.stripeRequest.setRequest(this.request);
+      } else {
+        LOG.warn("Stripe store is forced to take an arbitrary file list and compact it.");
+        this.stripeRequest = compactionPolicy.createEmptyRequest(storeFileManager, this.request);
+      }
+    }
+
+    @Override
+    public List<Path> compact() throws IOException {
+      Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
+      return this.stripeRequest.execute(compactor);
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Tue Oct 29 00:37:30 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ConcatenatedLists;
 
@@ -58,7 +59,8 @@ import com.google.common.collect.Immutab
  *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
  */
 @InterfaceAudience.Private
-class StripeStoreFileManager implements StoreFileManager {
+public class StripeStoreFileManager
+  implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
   static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
 
   /**
@@ -81,16 +83,16 @@ class StripeStoreFileManager implements 
    */
   private static class State {
     /**
-     * The end keys of each stripe. The last stripe end is always open-ended, so it's not stored
-     * here. It is invariant that the start key of the stripe is the end key of the previous one
+     * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
+     * here. It is invariant that the start row of the stripe is the end row of the previous one
      * (and is an open boundary for the first one).
      */
     public byte[][] stripeEndRows = new byte[0][];
 
     /**
-     * Files by stripe. Each element of the list corresponds to stripeEndKey with the corresponding
-     * index, except the last one. Inside each list, the files are in reverse order by seqNum.
-     * Note that the length of this is one higher than that of stripeEndKeys.
+     * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
+     * same index, except the last one. Inside each list, the files are in reverse order by
+     * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
      */
     public ArrayList<ImmutableList<StoreFile>> stripeFiles
       = new ArrayList<ImmutableList<StoreFile>>();
@@ -105,16 +107,20 @@ class StripeStoreFileManager implements 
   /** Cached file metadata (or overrides as the case may be) */
   private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
   private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
+  /** Normally invalid key is null, but in the map null is the result for "no key"; so use
+   * the following constant value in these maps instead. Note that this is a constant and
+   * we use it to compare by reference when we read from the map. */
+  private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
 
   private final KVComparator kvComparator;
   private StripeStoreConfig config;
 
   private final int blockingFileCount;
 
-  public StripeStoreFileManager(KVComparator kvComparator, Configuration conf) throws Exception {
+  public StripeStoreFileManager(
+      KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
     this.kvComparator = kvComparator;
-    // TODO: create this in a shared manner in StoreEngine when there's one
-    this.config = new StripeStoreConfig(conf);
+    this.config = config;
     this.blockingFileCount = conf.getInt(
         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
   }
@@ -378,7 +384,7 @@ class StripeStoreFileManager implements 
     while (entryIter.hasNext()) {
       Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
       ArrayList<StoreFile> files = entry.getValue();
-      // Validate the file start keys, and remove the bad ones to level 0.
+      // Validate the file start rows, and remove the bad ones to level 0.
       for (int i = 0; i < files.size(); ++i) {
         StoreFile sf = files.get(i);
         byte[] startRow = startOf(sf);
@@ -459,15 +465,8 @@ class StripeStoreFileManager implements 
   }
 
   private void ensureLevel0Metadata(StoreFile sf) {
-    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, null);
-    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, null);
-  }
-
-  /**
-   * For testing.
-   */
-  List<StoreFile> getLevel0Files() {
-    return state.level0Files;
+    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
+    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
   }
 
   private void debugDumpState(String string) {
@@ -515,7 +514,7 @@ class StripeStoreFileManager implements 
   }
 
   /**
-   * Finds the stripe index by end key.
+   * Finds the stripe index by end row.
    */
   private final int findStripeIndexByEndRow(byte[] endRow) {
     assert !isInvalid(endRow);
@@ -524,7 +523,7 @@ class StripeStoreFileManager implements 
   }
 
   /**
-   * Finds the stripe index for the stripe containing a key provided externally for get/scan.
+   * Finds the stripe index for the stripe containing a row provided externally for get/scan.
    */
   private final int findStripeForRow(byte[] row, boolean isStart) {
     if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
@@ -537,33 +536,28 @@ class StripeStoreFileManager implements 
     return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
   }
 
-  /**
-   * Gets the start key for a given stripe.
-   * @param stripeIndex Stripe index.
-   * @return Start key. May be an open key.
-   */
+  @Override
   public final byte[] getStartRow(int stripeIndex) {
     return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
   }
 
-  /**
-   * Gets the start key for a given stripe.
-   * @param stripeIndex Stripe index.
-   * @return Start key. May be an open key.
-   */
+  @Override
   public final byte[] getEndRow(int stripeIndex) {
     return (stripeIndex == state.stripeEndRows.length
         ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
   }
 
+
   private byte[] startOf(StoreFile sf) {
     byte[] result = this.fileStarts.get(sf);
-    return result != null ? result : sf.getMetadataValue(STRIPE_START_KEY);
+    return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
+        : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
   }
 
   private byte[] endOf(StoreFile sf) {
     byte[] result = this.fileEnds.get(sf);
-    return result != null ? result : sf.getMetadataValue(STRIPE_END_KEY);
+    return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
+        : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
   }
 
   /**
@@ -793,8 +787,8 @@ class StripeStoreFileManager implements 
 
     /**
      * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
-     * new candidate stripes/removes old stripes; produces new set of stripe end keys.
-     * @param newStripes  New stripes - files by end key.
+     * new candidate stripes/removes old stripes; produces new set of stripe end rows.
+     * @param newStripes  New stripes - files by end row.
      */
     private void processNewCandidateStripes(
         TreeMap<byte[], StoreFile> newStripes) throws IOException {
@@ -859,4 +853,31 @@ class StripeStoreFileManager implements 
       }
     }
   }
+
+  @Override
+  public List<StoreFile> getLevel0Files() {
+    return this.state.level0Files;
+  }
+
+  @Override
+  public List<byte[]> getStripeBoundaries() {
+    if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
+    ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
+    result.add(OPEN_KEY);
+    for (int i = 0; i < this.state.stripeEndRows.length; ++i) {
+      result.add(this.state.stripeEndRows[i]);
+    }
+    result.add(OPEN_KEY);
+    return result;
+  }
+
+  @Override
+  public ArrayList<ImmutableList<StoreFile>> getStripes() {
+    return this.state.stripeFiles;
+  }
+
+  @Override
+  public int getStripeCount() {
+    return this.state.stripeFiles.size();
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Tue Oct 29 00:37:30 2013
@@ -47,6 +47,7 @@ public class CompactionConfiguration {
   static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
 
   private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
+  public static final String RATIO_KEY = CONFIG_PREFIX + "ratio";
 
   Configuration conf;
   StoreConfigInformation storeConfigInfo;
@@ -72,7 +73,7 @@ public class CompactionConfiguration {
     minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
           /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
     maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
-    compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
+    compactionRatio = conf.getFloat(RATIO_KEY, 1.2F);
     offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
 
     throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -56,14 +56,6 @@ public abstract class CompactionPolicy {
   public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param storeFiles Current store files.
-   * @param filesCompacting files currently compacting.
-   * @return whether a compactionSelection is possible
-   */
-  public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
-      final List<StoreFile> filesCompacting);
-
-  /**
    * Inform the policy that some configuration has been change,
    * so cached value should be updated it any.
    */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Oct 29 00:37:30 2013
@@ -79,32 +79,6 @@ public abstract class Compactor {
     void append(KeyValue kv) throws IOException;
   }
 
-  /**
-   * Do a minor/major compaction on an explicit set of storefiles from a Store.
-   * @param request the requested compaction
-   * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
-   *         it through the compaction.
-   * @throws IOException
-   */
-  public abstract List<Path> compact(final CompactionRequest request) throws IOException;
-
-  /**
-   * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
-   * {@link #compact(CompactionRequest)};
-   * @param filesToCompact the files to compact. These are used as the compactionSelection for the
-   *          generated {@link CompactionRequest}.
-   * @param isMajor true to major compact (prune all deletes, max versions, etc)
-   * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
-   *         it through the compaction.
-   * @throws IOException
-   */
-  public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
-      throws IOException {
-    CompactionRequest cr = new CompactionRequest(filesToCompact);
-    cr.setIsMajor(isMajor);
-    return this.compact(cr);
-  }
-
   public CompactionProgress getProgress() {
     return this.progress;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Tue Oct 29 00:37:30 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionse
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -94,4 +95,21 @@ public class DefaultCompactor extends Co
     }
     return newFiles;
   }
+
+  /**
+   * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
+   * {@link #compact(CompactionRequest)};
+   * @param filesToCompact the files to compact. These are used as the compactionSelection for
+   *          the generated {@link CompactionRequest}.
+   * @param isMajor true to major compact (prune all deletes, max versions, etc)
+   * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
+   *         made it through the compaction.
+   * @throws IOException
+   */
+  public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
+      throws IOException {
+    CompactionRequest cr = new CompactionRequest(filesToCompact);
+    cr.setIsMajor(isMajor);
+    return this.compact(cr);
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -53,9 +53,19 @@ public class ExploringCompactionPolicy e
   @Override
   final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
     final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
+    return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
+        mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));
+  }
+
+  public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
+       boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
+
+    final double currentRatio = mayUseOffPeak
+        ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
+
     // Start off choosing nothing.
     List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
-    List<StoreFile> smallest = new ArrayList<StoreFile>(0);
+    List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
     long bestSize = 0;
     long smallestSize = Long.MAX_VALUE;
 
@@ -63,15 +73,15 @@ public class ExploringCompactionPolicy e
     // Consider every starting place.
     for (int start = 0; start < candidates.size(); start++) {
       // Consider every different sub list permutation in between start and end with min files.
-      for (int currentEnd = start + comConf.getMinFilesToCompact() - 1;
+      for (int currentEnd = start + minFiles - 1;
           currentEnd < candidates.size(); currentEnd++) {
         List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
 
         // Sanity checks
-        if (potentialMatchFiles.size() < comConf.getMinFilesToCompact()) {
+        if (potentialMatchFiles.size() < minFiles) {
           continue;
         }
-        if (potentialMatchFiles.size() > comConf.getMaxFilesToCompact()) {
+        if (potentialMatchFiles.size() > maxFiles) {
           continue;
         }
 
@@ -81,7 +91,7 @@ public class ExploringCompactionPolicy e
 
         // Store the smallest set of files.  This stored set of files will be used
         // if it looks like the algorithm is stuck.
-        if (size < smallestSize) {
+        if (mightBeStuck && size < smallestSize) {
           smallest = potentialMatchFiles;
           smallestSize = size;
         }
@@ -92,7 +102,7 @@ public class ExploringCompactionPolicy e
 
         ++opts;
         if (size >= comConf.getMinCompactSize()
-            && !filesInRatio(potentialMatchFiles, mayUseOffPeak)) {
+            && !filesInRatio(potentialMatchFiles, currentRatio)) {
           continue;
         }
 
@@ -150,15 +160,13 @@ public class ExploringCompactionPolicy e
    *      FileSize(i) <= ( Sum(0,N,FileSize(_)) - FileSize(i) ) * Ratio.
    *
    * @param files List of store files to consider as a compaction candidate.
-   * @param isOffPeak should the offPeak compaction ratio be used ?
+   * @param currentRatio The ratio to use.
    * @return a boolean if these files satisfy the ratio constraints.
    */
-  private boolean filesInRatio(final List<StoreFile> files, final boolean isOffPeak) {
+  private boolean filesInRatio(final List<StoreFile> files, final double currentRatio) {
     if (files.size() < 2) {
-      return  true;
+      return true;
     }
-    final double currentRatio =
-        isOffPeak ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
 
     long totalFileSize = getTotalStoreSize(files);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -388,7 +388,6 @@ public class RatioBasedCompactionPolicy 
     return compactionSize > comConf.getThrottlePoint();
   }
 
-  @Override
   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
       final List<StoreFile> filesCompacting) {
     int numCandidates = storeFiles.size() - filesCompacting.size();

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,563 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcatenatedLists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Stripe store implementation of compaction policy.
+ */
+@InterfaceAudience.Private
+public class StripeCompactionPolicy extends CompactionPolicy {
+  private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
+  // Policy used to compact individual stripes.
+  private ExploringCompactionPolicy stripePolicy = null;
+
+  private StripeStoreConfig config;
+
+  public StripeCompactionPolicy(
+      Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
+    super(conf, storeConfigInfo);
+    this.config = config;
+    stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
+  }
+
+  public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
+      List<StoreFile> filesCompacting) {
+    // We sincerely hope nobody is messing with us with their coprocessors.
+    // If they do, they are very likely to shoot themselves in the foot.
+    // We'll just exclude all the filesCompacting from the list.
+    ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
+    candidateFiles.removeAll(filesCompacting);
+    return candidateFiles;
+  }
+
+  public StripeCompactionRequest createEmptyRequest(
+      StripeInformationProvider si, CompactionRequest request) {
+    // Treat as L0-ish compaction with fixed set of files, and hope for the best.
+    if (si.getStripeCount() > 0) {
+      return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
+    }
+    int initialCount = this.config.getInitialCount();
+    long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
+    return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
+  }
+
+  public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
+      List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
+    // TODO: first cut - no parallel compactions. To have more fine grained control we
+    //       probably need structure more sophisticated than a list.
+    if (!filesCompacting.isEmpty()) {
+      LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
+      return null;
+    }
+
+    // We are going to do variations of compaction in strict order of preference.
+    // A better/more advanced approach is to use a heuristic to see which one is "more
+    // necessary" at current time.
+
+    // This can happen due to region split. We can skip it later; for now preserve
+    // compact-all-things behavior.
+    Collection<StoreFile> allFiles = si.getStorefiles();
+    if (StoreUtils.hasReferences(allFiles)) {
+      LOG.debug("There are references in the store; compacting all files");
+      long targetKvs = estimateTargetKvs(allFiles, config.getSplitCount()).getFirst();
+      SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+          allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
+      request.setMajorRangeFull();
+      return request;
+    }
+
+    int stripeCount = si.getStripeCount();
+    List<StoreFile> l0Files = si.getLevel0Files();
+
+    // See if we need to make new stripes.
+    boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
+    if (stripeCount == 0) {
+      if (!shouldCompactL0) return null; // nothing to do.
+      return selectNewStripesCompaction(si);
+    }
+
+    boolean canDropDeletesNoL0 = l0Files.size() == 0;
+    if (shouldCompactL0) {
+      if (!canDropDeletesNoL0) {
+        // If we need to compact L0, see if we can add something to it, and drop deletes.
+        StripeCompactionRequest result = selectSingleStripeCompaction(
+            si, true, canDropDeletesNoL0, isOffpeak);
+        if (result != null) return result;
+      }
+      LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
+      return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
+    }
+
+    // Try to delete fully expired stripes
+    StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
+    if (result != null) return result;
+
+    // Ok, nothing special here, let's see if we need to do a common compaction.
+    // This will also split the stripes that are too big if needed.
+    return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
+  }
+
+  public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
+    // Approximation on whether we need compaction.
+    return filesCompacting.isEmpty()
+        && (StoreUtils.hasReferences(si.getStorefiles())
+          || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
+          || needsSingleStripeCompaction(si));
+  }
+
+  @Override
+  public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
+    return false; // there's never a major compaction!
+  }
+
+  @Override
+  public boolean throttleCompaction(long compactionSize) {
+    return compactionSize > comConf.getThrottlePoint();
+  }
+
+  /**
+   * @param si StoreFileManager.
+   * @return Whether any stripe potentially needs compaction.
+   */
+  protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
+    int minFiles = this.config.getStripeCompactMinFiles();
+    for (List<StoreFile> stripe : si.getStripes()) {
+      if (stripe.size() >= minFiles) return true;
+    }
+    return false;
+  }
+
+  protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
+      boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
+    ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
+
+    int bqIndex = -1;
+    List<StoreFile> bqSelection = null;
+    int stripeCount = stripes.size();
+    long bqTotalSize = -1;
+    for (int i = 0; i < stripeCount; ++i) {
+      // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
+      // So, pass includeL0 as 2nd parameter to indicate that.
+      List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
+          !canDropDeletesWithoutL0 && includeL0, isOffpeak);
+      if (selection.isEmpty()) continue;
+      long size = 0;
+      for (StoreFile sf : selection) {
+        size += sf.getReader().length();
+      }
+      if (bqSelection == null || selection.size() > bqSelection.size() ||
+          (selection.size() == bqSelection.size() && size < bqTotalSize)) {
+        bqSelection = selection;
+        bqIndex = i;
+        bqTotalSize = size;
+      }
+    }
+    if (bqSelection == null) {
+      LOG.debug("No good compaction is possible in any stripe");
+      return null;
+    }
+    List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
+    // See if we can, and need to, split this stripe.
+    int targetCount = 1;
+    long targetKvs = Long.MAX_VALUE;
+    boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
+    String splitString = "";
+    if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
+      if (includeL0) {
+        // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
+        // So, if we might split, don't compact the stripe with L0.
+        return null;
+      }
+      Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
+      targetKvs = kvsAndCount.getFirst();
+      targetCount = kvsAndCount.getSecond();
+      splitString = "; the stripe will be split into at most "
+          + targetCount + " stripes with " + targetKvs + " target KVs";
+    }
+
+    LOG.debug("Found compaction in a stripe with end key ["
+        + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
+        + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
+
+    // See if we can drop deletes.
+    StripeCompactionRequest req;
+    if (includeL0) {
+      assert hasAllFiles;
+      List<StoreFile> l0Files = si.getLevel0Files();
+      LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
+      ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
+      sfs.addSublist(filesToCompact);
+      sfs.addSublist(l0Files);
+      req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
+    } else {
+      req = new SplitStripeCompactionRequest(
+          filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
+    }
+    if (canDropDeletesWithoutL0 || includeL0) {
+      req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
+    }
+    req.getRequest().setOffPeak(isOffpeak);
+    return req;
+  }
+
+  /**
+   * Selects the compaction of a single stripe using default policy.
+   * @param sfs Files.
+   * @param allFilesOnly Whether a compaction of all-or-none files is needed.
+   * @return The resulting selection.
+   */
+  private List<StoreFile> selectSimpleCompaction(
+      List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
+    int minFilesLocal = Math.max(
+        allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
+    int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
+    return stripePolicy.applyCompactionPolicy(sfs, isOffpeak, false, minFilesLocal, maxFilesLocal);
+  }
+
+  /**
+   * Selects the compaction that compacts all files (to be removed later).
+   * @param si StoreFileManager.
+   * @param targetStripeCount Target stripe count.
+   * @param targetSize Target stripe size.
+   * @return The compaction.
+   */
+  private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
+      int targetStripeCount, long targetSize) {
+    Collection<StoreFile> allFiles = si.getStorefiles();
+    SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+        allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
+    request.setMajorRangeFull();
+    LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
+    return request;
+  }
+
+  private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
+    List<StoreFile> l0Files = si.getLevel0Files();
+    Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
+    LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
+        + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
+    SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+        si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
+    request.setMajorRangeFull(); // L0 only, can drop deletes.
+    return request;
+  }
+
+  private StripeCompactionRequest selectExpiredMergeCompaction(
+      StripeInformationProvider si, boolean canDropDeletesNoL0) {
+    long cfTtl = this.storeConfigInfo.getStoreFileTtl();
+    if (cfTtl == Long.MAX_VALUE) {
+      return null; // minversion might be set, cannot delete old files
+    }
+    long timestampCutoff = EnvironmentEdgeManager.currentTimeMillis() - cfTtl;
+    // Merge the longest sequence of stripes where all files have expired, if any.
+    int start = -1, bestStart = -1, length = 0, bestLength = 0;
+    ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
+    OUTER: for (int i = 0; i < stripes.size(); ++i) {
+      for (StoreFile storeFile : stripes.get(i)) {
+        if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
+        // Found non-expired file, this stripe has to stay.
+        if (length > bestLength) {
+          bestStart = start;
+          bestLength = length;
+        }
+        start = -1;
+        length = 0;
+        continue OUTER;
+      }
+      if (start == -1) {
+        start = i;
+      }
+      ++length;
+    }
+    if (length > bestLength) {
+      bestStart = start;
+      bestLength = length;
+    }
+    if (bestLength == 0) return null;
+    if (bestLength == 1) {
+      // This is currently inefficient. If only one stripe expired, we will rewrite some
+      // entire stripe just to delete some expired files because we rely on metadata and it
+      // cannot simply be updated in an old file. When we either determine stripe dynamically
+      // or move metadata to manifest, we can just drop the "expired stripes".
+      if (bestStart == (stripes.size() - 1)) return null;
+      ++bestLength;
+    }
+    LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
+    int endIndex = bestStart + bestLength - 1;
+    ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
+    sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
+    SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
+        si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
+    if (canDropDeletesNoL0) {
+      result.setMajorRangeFull();
+    }
+    return result;
+  }
+
+  private static long getTotalKvCount(final Collection<StoreFile> candidates) {
+    long totalSize = 0;
+    for (StoreFile storeFile : candidates) {
+      totalSize += storeFile.getReader().getEntries();
+    }
+    return totalSize;
+  }
+
+  private static long getTotalFileSize(final Collection<StoreFile> candidates) {
+    long totalSize = 0;
+    for (StoreFile storeFile : candidates) {
+      totalSize += storeFile.getReader().length();
+    }
+    return totalSize;
+  }
+
+  private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
+    // If the size is larger than what we target, we don't want to split into proportionally
+    // larger parts and then have to split again very soon. So, we will increase the multiplier
+    // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
+    // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
+    long totalSize = getTotalFileSize(files);
+    long targetPartSize = config.getSplitPartSize();
+    assert targetPartSize > 0 && splitCount > 0;
+    double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
+    while (ratio > 1.0) {
+      // Ratio of real to desired size if we increase the multiplier.
+      double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
+      if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
+      ratio = newRatio;
+      splitCount += 1.0;
+    }
+    long kvCount = (long)(getTotalKvCount(files) / splitCount);
+    return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
+  }
+
+  /** Stripe compaction request wrapper. */
+  public abstract static class StripeCompactionRequest {
+    protected CompactionRequest request;
+    protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
+
+    /**
+     * Executes the request against compactor (essentially, just calls correct overload of
+     * compact method), to simulate more dynamic dispatch.
+     * @param compactor Compactor.
+     * @return result of compact(...)
+     */
+    public abstract List<Path> execute(StripeCompactor compactor);
+
+    public StripeCompactionRequest(CompactionRequest request) {
+      this.request = request;
+    }
+
+    /**
+     * Sets compaction "major range". Major range is the key range for which all
+     * the files are included, so they can be treated like major-compacted files.
+     * @param startRow Left boundary, inclusive.
+     * @param endRow Right boundary, exclusive.
+     */
+    public void setMajorRange(byte[] startRow, byte[] endRow) {
+      this.majorRangeFromRow = startRow;
+      this.majorRangeToRow = endRow;
+    }
+
+    public CompactionRequest getRequest() {
+      return this.request;
+    }
+
+    public void setRequest(CompactionRequest request) {
+      assert request != null;
+      this.request = request;
+      this.majorRangeFromRow = this.majorRangeToRow = null;
+    }
+  }
+
+  /**
+   * Request for stripe compactor that will cause it to split the source files into several
+   * separate files at the provided boundaries.
+   */
+  private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
+    private final List<byte[]> targetBoundaries;
+
+    /**
+     * @param request Original request.
+     * @param targetBoundaries New files should be written with these boundaries.
+     */
+    public BoundaryStripeCompactionRequest(CompactionRequest request,
+        List<byte[]> targetBoundaries) {
+      super(request);
+      this.targetBoundaries = targetBoundaries;
+    }
+
+    public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
+        List<byte[]> targetBoundaries) {
+      this(new CompactionRequest(files), targetBoundaries);
+    }
+
+    @Override
+    public List<Path> execute(StripeCompactor compactor) {
+      return compactor.compact(
+          this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
+    }
+  }
+
+  /**
+   * Request for stripe compactor that will cause it to split the source files into several
+   * separate files into based on key-value count, as well as file count limit.
+   * Most of the files will be roughly the same size. The last file may be smaller or larger
+   * depending on the interplay of the amount of data and maximum number of files allowed.
+   */
+  private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
+    private final byte[] startRow, endRow;
+    private final int targetCount;
+    private final long targetKvs;
+
+    /**
+     * @param request Original request.
+     * @param startRow Left boundary of the range to compact, inclusive.
+     * @param endRow Right boundary of the range to compact, exclusive.
+     * @param targetCount The maximum number of stripe to compact into.
+     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
+     *                  total number of kvs, all the overflow data goes into the last stripe.
+     */
+    public SplitStripeCompactionRequest(CompactionRequest request,
+        byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
+      super(request);
+      this.startRow = startRow;
+      this.endRow = endRow;
+      this.targetCount = targetCount;
+      this.targetKvs = targetKvs;
+    }
+
+    public SplitStripeCompactionRequest(
+        CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
+      this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
+    }
+
+    public SplitStripeCompactionRequest(
+        Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
+      this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
+    }
+
+    public SplitStripeCompactionRequest(Collection<StoreFile> files,
+        byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
+      this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
+    }
+
+    @Override
+    public List<Path> execute(StripeCompactor compactor) {
+      return compactor.compact(this.request, this.targetCount, this.targetKvs,
+          this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
+    }
+
+    /** Set major range of the compaction to the entire compaction range.
+     * See {@link #setMajorRange(byte[], byte[])}. */
+    public void setMajorRangeFull() {
+      setMajorRange(this.startRow, this.endRow);
+    }
+  }
+
+  /** Helper class used to calculate size related things */
+  private static class StripeSizes {
+    public final ArrayList<Long> kvCounts;
+    public final ArrayList<Long> fileSizes;
+    public double avgKvCount = 0;
+    public long minKvCount = Long.MAX_VALUE, maxKvCount = Long.MIN_VALUE;
+    public int minIndex = -1, maxIndex = -1;
+
+    public StripeSizes(List<ImmutableList<StoreFile>> stripes) {
+      assert !stripes.isEmpty();
+      kvCounts = new ArrayList<Long>(stripes.size());
+      fileSizes = new ArrayList<Long>(stripes.size());
+      for (int i = 0; i < stripes.size(); ++i) {
+        long kvCount = getTotalKvCount(stripes.get(i));
+        fileSizes.add(getTotalFileSize(stripes.get(i)));
+        kvCounts.add(kvCount);
+        avgKvCount += (double)(kvCount - avgKvCount) / (i + 1);
+        if (minKvCount > kvCount) {
+          minIndex = i;
+          minKvCount = kvCount;
+        }
+        if (maxKvCount < kvCount) {
+          maxIndex = i;
+          maxKvCount = kvCount;
+        }
+      }
+    }
+  }
+
+  /** The information about stripes that the policy needs to do its stuff */
+  public static interface StripeInformationProvider {
+    public Collection<StoreFile> getStorefiles();
+
+    /**
+     * Gets the start row for a given stripe.
+     * @param stripeIndex Stripe index.
+     * @return Start row. May be an open key.
+     */
+    public byte[] getStartRow(int stripeIndex);
+
+    /**
+     * Gets the end row for a given stripe.
+     * @param stripeIndex Stripe index.
+     * @return End row. May be an open key.
+     */
+    public byte[] getEndRow(int stripeIndex);
+
+    /**
+     * @return Level 0 files.
+     */
+    public List<StoreFile> getLevel0Files();
+
+    /**
+     * @return All stripe boundaries; including the open ones on both ends.
+     */
+    public List<byte[]> getStripeBoundaries();
+
+    /**
+     * @return The stripes.
+     */
+    public ArrayList<ImmutableList<StoreFile>> getStripes();
+
+    /**
+     * @return Stripe count.
+     */
+    public int getStripeCount();
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * This is the placeholder for stripe compactor. The implementation,
+ * as well as the proper javadoc, will be added in HBASE-7967.
+ */
+public class StripeCompactor extends Compactor {
+
+  public StripeCompactor(Configuration conf, final Store store) {
+    super(conf, store);
+  }
+
+  public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
+      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+    throw new NotImplementedException();
+  }
+
+  public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
+      byte[] left, byte[] right, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+    throw new NotImplementedException();
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 29 00:37:30 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -644,7 +645,7 @@ public class TestCompaction {
     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
 
     Collection<StoreFile> storeFiles = store.getStorefiles();
-    Compactor tool = store.storeEngine.getCompactor();
+    DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
 
     List<Path> newFiles = tool.compactForTesting(storeFiles, false);
 

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestStripeStoreEngine {
+
+  @Test
+  public void testCreateBasedOnConfig() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
+    StripeStoreEngine se = createEngine(conf);
+    assertTrue(se.getCompactionPolicy() instanceof StripeCompactionPolicy);
+  }
+
+  public static class TestStoreEngine extends StripeStoreEngine {
+    public void setCompactorOverride(StripeCompactor compactorOverride) {
+      this.compactor = compactorOverride;
+    }
+  }
+
+  @Test
+  public void testCompactionContextForceSelect() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    int targetCount = 2;
+    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, targetCount);
+    conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
+    TestStoreEngine se = createEngine(conf);
+    StripeCompactor mockCompactor = mock(StripeCompactor.class);
+    se.setCompactorOverride(mockCompactor);
+    when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(),
+        any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class)))
+        .thenReturn(new ArrayList<Path>());
+
+    // Produce 3 L0 files.
+    StoreFile sf = createFile();
+    ArrayList<StoreFile> compactUs = al(sf, createFile(), createFile());
+    se.getStoreFileManager().loadFiles(compactUs);
+    // Create a compaction that would want to split the stripe.
+    CompactionContext compaction = se.createCompaction();
+    compaction.select(al(), false, false, false);
+    assertEquals(3, compaction.getRequest().getFiles().size());
+    // Override the file list. Granted, overriding this compaction in this manner will
+    // break things in real world, but we only want to verify the override.
+    compactUs.remove(sf);
+    CompactionRequest req = new CompactionRequest(compactUs);
+    compaction.forceSelect(req);
+    assertEquals(2, compaction.getRequest().getFiles().size());
+    assertFalse(compaction.getRequest().getFiles().contains(sf));
+    // Make sure the correct method it called on compactor.
+    compaction.compact();
+    verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
+          StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null);
+  }
+
+  private static StoreFile createFile() throws Exception {
+    StoreFile sf = mock(StoreFile.class);
+    when(sf.getMetadataValue(any(byte[].class)))
+      .thenReturn(StripeStoreFileManager.INVALID_KEY);
+    when(sf.getReader()).thenReturn(mock(StoreFile.Reader.class));
+    when(sf.getPath()).thenReturn(new Path("moo"));
+    return sf;
+  }
+
+  private static TestStoreEngine createEngine(Configuration conf) throws Exception {
+    Store store = mock(Store.class);
+    KVComparator kvComparator = mock(KVComparator.class);
+    return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
+  }
+
+  private static ArrayList<StoreFile> al(StoreFile... sfs) {
+    return new ArrayList<StoreFile>(Arrays.asList(sfs));
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java Tue Oct 29 00:37:30 2013
@@ -43,6 +43,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 
 @Category(SmallTests.class)
@@ -237,7 +238,7 @@ public class TestStripeStoreFileManager 
 
     Configuration conf = HBaseConfiguration.create();
     if (splitRatioToVerify != 0) {
-      conf.setFloat(StripeStoreConfig.MAX_SPLIT_IMBALANCE, splitRatioToVerify);
+      conf.setFloat(StripeStoreConfig.MAX_REGION_SPLIT_IMBALANCE_KEY, splitRatioToVerify);
     }
     StripeStoreFileManager manager = createManager(al(), conf);
     manager.addCompactionResults(al(), sfs);
@@ -536,6 +537,7 @@ public class TestStripeStoreFileManager 
     verifyGetOrScanScenario(manager, false, null, null, results);
   }
 
+  // TODO: replace with Mockito?
   private static MockStoreFile createFile(
       long size, long seqNum, byte[] startKey, byte[] endKey) throws Exception {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
@@ -573,7 +575,9 @@ public class TestStripeStoreFileManager 
 
   private static StripeStoreFileManager createManager(
       ArrayList<StoreFile> sfs, Configuration conf) throws Exception {
-    StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf);
+    StripeStoreConfig config = new StripeStoreConfig(
+        conf, Mockito.mock(StoreConfigInformation.class));
+    StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf, config);
     result.loadFiles(sfs);
     return result;
   }