You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/11/06 02:49:36 UTC

svn commit: r1539211 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regio...

Author: sershe
Date: Wed Nov  6 01:49:36 2013
New Revision: 1539211

URL: http://svn.apache.org/r1539211
Log:
HBASE-8541 implement flush-into-stripes in stripe compactions

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java Wed Nov  6 01:49:36 2013
@@ -68,9 +68,9 @@ class DefaultStoreFileManager implements
   }
 
   @Override
-  public void insertNewFile(StoreFile sf) {
+  public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
     ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
-    newFiles.add(sf);
+    newFiles.addAll(sfs);
     sortAndSetStoreFiles(newFiles);
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Wed Nov  6 01:49:36 2013
@@ -53,13 +53,7 @@ public class DefaultStoreFlusher extends
 
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
-    KeyValueScanner memstoreScanner =
-        new CollectionBackedScanner(snapshot, store.getComparator());
-    InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
-    if (scanner == null) {
-      scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
-    }
-    scanner = postCreateCoprocScanner(scanner);
+    InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means skip normal processing
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Nov  6 01:49:36 2013
@@ -611,7 +611,7 @@ public class HStore implements Store {
     // Append the new storefile into the list
     this.lock.writeLock().lock();
     try {
-      this.storeEngine.getStoreFileManager().insertNewFile(sf);
+      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
     } finally {
       // We need the lock, as long as we are updating the storeFiles
       // or changing the memstore. Let us release it before calling
@@ -852,9 +852,7 @@ public class HStore implements Store {
       final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
     this.lock.writeLock().lock();
     try {
-      for (StoreFile sf : sfs) {
-        this.storeEngine.getStoreFileManager().insertNewFile(sf);
-      }
+      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
       this.memstore.clearSnapshot(set);
     } finally {
       // We need the lock, as long as we are updating the storeFiles
@@ -1122,15 +1120,15 @@ public class HStore implements Store {
       .append(" to execute.");
     LOG.info(message.toString());
     if (LOG.isTraceEnabled()) {
-     int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
-     long resultSize = 0;
-     for (StoreFile sf : sfs) {
-       resultSize += sf.getReader().length();
-     }
-     String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
-       + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
-         + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
-     LOG.trace(traceMessage);
+      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
+      long resultSize = 0;
+      for (StoreFile sf : sfs) {
+        resultSize += sf.getReader().length();
+      }
+      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
+        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
+          + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
+      LOG.trace(traceMessage);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Wed Nov  6 01:49:36 2013
@@ -47,10 +47,10 @@ public interface StoreFileManager {
   void loadFiles(List<StoreFile> storeFiles);
 
   /**
-   * Adds new file, either for from MemStore flush or bulk insert, into the structure.
+   * Adds new files, either for from MemStore flush or bulk insert, into the structure.
    * @param sf New store file.
    */
-  void insertNewFile(StoreFile sf);
+  void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
 
   /**
    * Adds compaction results into the structure.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Wed Nov  6 01:49:36 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValueU
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 /**
  * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -77,31 +78,27 @@ abstract class StoreFlusher {
     writer.close();
   }
 
-  /** Calls coprocessor to create a flush scanner based on memstore scanner */
-  protected InternalScanner preCreateCoprocScanner(
-      KeyValueScanner memstoreScanner) throws IOException {
-    if (store.getCoprocessorHost() != null) {
-      return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
-    }
-    return null;
-  }
-
-  /** Creates the default flush scanner based on memstore scanner */
-  protected InternalScanner createStoreScanner(long smallestReadPoint,
-      KeyValueScanner memstoreScanner) throws IOException {
-    Scan scan = new Scan();
-    scan.setMaxVersions(store.getScanInfo().getMaxVersions());
-    return new StoreScanner(store, store.getScanInfo(), scan,
-        Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
-        smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
-  }
 
   /**
-   * Calls coprocessor to create a scanner based on default flush scanner
-   * @return new or default scanner; if null, flush should not proceed.
+   * Creates the scanner for flushing snapshot. Also calls coprocessors.
+   * @return The scanner; null if coprocessor is canceling the flush.
    */
-  protected  InternalScanner postCreateCoprocScanner(InternalScanner scanner)
-      throws IOException {
+  protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
+      long smallestReadPoint) throws IOException {
+    KeyValueScanner memstoreScanner =
+        new CollectionBackedScanner(snapshot, store.getComparator());
+    InternalScanner scanner = null;
+    if (store.getCoprocessorHost() != null) {
+      scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+    }
+    if (scanner == null) {
+      Scan scan = new Scan();
+      scan.setMaxVersions(store.getScanInfo().getMaxVersions());
+      scanner = new StoreScanner(store, store.getScanInfo(), scan,
+          Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+          smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+    }
+    assert scanner != null;
     if (store.getCoprocessorHost() != null) {
       return store.getCoprocessorHost().preFlush(store, scanner);
     }
@@ -114,7 +111,7 @@ abstract class StoreFlusher {
    * @param sink Sink to write data to. Could be StoreFile.Writer.
    * @param smallestReadPoint Smallest read point used for the flush.
    * @return Bytes flushed.
-s   */
+   */
   protected long performFlush(InternalScanner scanner,
       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
     int compactionKVMax =

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java Wed Nov  6 01:49:36 2013
@@ -46,6 +46,9 @@ public abstract class StripeMultiFileWri
   /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
   protected StoreScanner sourceScanner;
 
+  /** Whether to write stripe metadata */
+  private boolean doWriteStripeMetadata = true;
+
   public interface WriterFactory {
     public StoreFile.Writer createWriter() throws IOException;
   }
@@ -63,18 +66,24 @@ public abstract class StripeMultiFileWri
     this.comparator = comparator;
   }
 
+  public void setNoStripeMetadata() {
+    this.doWriteStripeMetadata = false;
+  }
+
   public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
     assert this.existingWriters != null;
     commitWritersInternal();
     assert this.boundaries.size() == (this.existingWriters.size() + 1);
-    LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers");
-
+    LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
+      + "riting out metadata for " + this.existingWriters.size() + " writers");
     List<Path> paths = new ArrayList<Path>();
     for (int i = 0; i < this.existingWriters.size(); ++i) {
       StoreFile.Writer writer = this.existingWriters.get(i);
       if (writer == null) continue; // writer was skipped due to 0 KVs
-      writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
-      writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
+      if (doWriteStripeMetadata) {
+        writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
+        writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
+      }
       writer.appendMetadata(maxSeqId, isMajor);
       paths.add(writer.getPath());
       writer.close();
@@ -109,8 +118,8 @@ public abstract class StripeMultiFileWri
       byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
     if (StripeStoreFileManager.OPEN_KEY != left &&
         comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
-      String error = "The first row is lower than the left boundary of ["
-        + Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
+      String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
+        + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
       LOG.error(error);
       throw new IOException(error);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java Wed Nov  6 01:49:36 2013
@@ -56,6 +56,9 @@ public class StripeStoreConfig {
    to get this count from the outset and prevent unnecessary splitting. */
   public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
 
+  /** Whether to flush memstore to L0 files, or directly to stripes. */
+  public static final String FLUSH_TO_L0_KEY = "hbase.store.stripe.compaction.flushToL0";
+
   /** When splitting region, the maximum size imbalance to allow in an attempt to split at a
    stripe boundary, so that no files go to both regions. Most users won't need to change that. */
   public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
@@ -70,36 +73,46 @@ public class StripeStoreConfig {
   private final int initialCount;
   private final long sizeToSplitAt;
   private final float splitPartCount;
+  private final boolean flushIntoL0;
   private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
 
-  private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
+  private static final double EPSILON = 0.001; // good enough for this, not a real epsilon.
   public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
     this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
     this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
     this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
     this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
+    this.flushIntoL0 = config.getBoolean(FLUSH_TO_L0_KEY, false);
 
-    this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
+    float splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2f, true);
     if (Math.abs(splitPartCount - 1.0) < EPSILON) {
-      throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
+      LOG.error("Split part count cannot be 1 (" + this.splitPartCount + "), using the default");
+      splitPartCount = 2f;
     }
-    // TODO: change when no L0.
+    this.splitPartCount = splitPartCount;
     // Arbitrary default split size - 4 times the size of one L0 compaction.
+    // If we flush into L0 there's no split compaction, but for default value it is ok.
     double flushSize = sci.getMemstoreFlushSize();
     if (flushSize == 0) {
       flushSize = 128 * 1024 * 1024;
     }
     long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
     this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
-    this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+    int initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+    if (initialCount == 0) {
+      LOG.error("Initial stripe count is 0, using the default");
+      initialCount = 1;
+    }
+    this.initialCount = initialCount;
     this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
   }
 
   private static float getFloat(
       Configuration config, String key, float defaultValue, boolean moreThanOne) {
     float value = config.getFloat(key, defaultValue);
-    if (value == 0) {
-      LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
+    if (value < EPSILON) {
+      LOG.warn(String.format(
+          "%s is set to 0 or negative; using default value of %f", key, defaultValue));
       value = defaultValue;
     } else if ((value > 1f) != moreThanOne) {
       value = 1f / value;
@@ -123,6 +136,10 @@ public class StripeStoreConfig {
     return stripeCompactMaxFiles;
   }
 
+  public boolean isUsingL0Flush() {
+    return flushIntoL0;
+  }
+
   public long getSplitSize() {
     return sizeToSplitAt;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java Wed Nov  6 01:49:36 2013
@@ -38,7 +38,7 @@ import com.google.common.base.Preconditi
  * The storage engine that implements the stripe-based store/compaction scheme.
  */
 @InterfaceAudience.Private
-public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
+public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
   StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
   static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
   private StripeStoreConfig config;
@@ -58,8 +58,9 @@ public class StripeStoreEngine extends S
       Configuration conf, Store store, KVComparator comparator) throws IOException {
     this.config = new StripeStoreConfig(conf, store);
     this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
-    this.storeFlusher = new DefaultStoreFlusher(conf, store);
     this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+    this.storeFlusher = new StripeStoreFlusher(
+      conf, store, this.compactionPolicy, this.storeFileManager);
     this.compactor = new StripeCompactor(conf, store);
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Wed Nov  6 01:49:36 2013
@@ -24,8 +24,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.util.Conc
 
 import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 
 /**
  * Stripe implementation of StoreFileManager.
@@ -136,15 +140,10 @@ public class StripeStoreFileManager
   }
 
   @Override
-  public void insertNewFile(StoreFile sf) {
-    LOG.debug("New level 0 file: " + sf);
-    ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(state.level0Files);
-    insertFileIntoStripe(newFiles, sf);
-    ensureLevel0Metadata(sf);
-    this.state.level0Files = ImmutableList.copyOf(newFiles);
-    ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(state.allFilesCached);
-    newAllFiles.add(sf);
-    this.state.allFilesCached = ImmutableList.copyOf(newAllFiles);
+  public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
+    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
+    cmc.mergeResults(null, sfs);
+    debugDumpState("Added new files");
   }
 
   @Override
@@ -304,7 +303,7 @@ public class StripeStoreFileManager
         + " files replaced by " + results.size());
     // In order to be able to fail in the middle of the operation, we'll operate on lazy
     // copies and apply the result at the end.
-    CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy();
+    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
     cmc.mergeResults(compactedFiles, results);
     debugDumpState("Merged compaction results");
   }
@@ -628,11 +627,11 @@ public class StripeStoreFileManager
   }
 
   /**
-   * Non-static helper class for merging compaction results.
-   * Since we want to merge them atomically (more or less), it operates on lazy copies, and
-   * then applies copies to real lists as necessary.
+   * Non-static helper class for merging compaction or flush results.
+   * Since we want to merge them atomically (more or less), it operates on lazy copies,
+   * then creates a new state object and puts it in place.
    */
-  private class CompactionResultsMergeCopy {
+  private class CompactionOrFlushMergeCopy {
     private ArrayList<List<StoreFile>> stripeFiles = null;
     private ArrayList<StoreFile> level0Files = null;
     private ArrayList<byte[]> stripeEndRows = null;
@@ -641,11 +640,13 @@ public class StripeStoreFileManager
     private Collection<StoreFile> results = null;
 
     private List<StoreFile> l0Results = new ArrayList<StoreFile>();
+    private final boolean isFlush;
 
-    public CompactionResultsMergeCopy() {
+    public CompactionOrFlushMergeCopy(boolean isFlush) {
       // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
       this.stripeFiles = new ArrayList<List<StoreFile>>(
           StripeStoreFileManager.this.state.stripeFiles);
+      this.isFlush = isFlush;
     }
 
     public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
@@ -654,8 +655,8 @@ public class StripeStoreFileManager
       this.compactedFiles = compactedFiles;
       this.results = results;
       // Do logical processing.
-      removeCompactedFiles();
-      TreeMap<byte[], StoreFile> newStripes = processCompactionResults();
+      if (!isFlush) removeCompactedFiles();
+      TreeMap<byte[], StoreFile> newStripes = processResults();
       if (newStripes != null) {
         processNewCandidateStripes(newStripes);
       }
@@ -681,7 +682,7 @@ public class StripeStoreFileManager
       }
 
       List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
-      newAllFiles.removeAll(compactedFiles);
+      if (!isFlush) newAllFiles.removeAll(compactedFiles);
       newAllFiles.addAll(results);
       newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
       return newState;
@@ -689,12 +690,16 @@ public class StripeStoreFileManager
 
     private void updateMetadataMaps() {
       StripeStoreFileManager parent = StripeStoreFileManager.this;
-      for (StoreFile sf : this.compactedFiles) {
-        parent.fileStarts.remove(sf);
-        parent.fileEnds.remove(sf);
+      if (!isFlush) {
+        for (StoreFile sf : this.compactedFiles) {
+          parent.fileStarts.remove(sf);
+          parent.fileEnds.remove(sf);
+        }
       }
-      for (StoreFile sf : this.l0Results) {
-        parent.ensureLevel0Metadata(sf);
+      if (this.l0Results != null) {
+        for (StoreFile sf : this.l0Results) {
+          parent.ensureLevel0Metadata(sf);
+        }
       }
     }
 
@@ -729,12 +734,14 @@ public class StripeStoreFileManager
      * or to the list of new candidate stripes.
      * @return New candidate stripes.
      */
-    private TreeMap<byte[], StoreFile> processCompactionResults() throws IOException {
+    private TreeMap<byte[], StoreFile> processResults() throws IOException {
       TreeMap<byte[], StoreFile> newStripes = null;
       for (StoreFile sf : this.results) {
         byte[] startRow = startOf(sf), endRow = endOf(sf);
         if (isInvalid(endRow) || isInvalid(startRow)) {
-          LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath());
+          if (!isFlush) {
+            LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
+          }
           insertFileIntoStripe(getLevel0Copy(), sf);
           this.l0Results.add(sf);
           continue;
@@ -775,7 +782,7 @@ public class StripeStoreFileManager
           int stripeIndex = findStripeIndexByEndRow(oldEndRow);
           if (stripeIndex < 0) {
             throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
-                + " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])");
+                + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
           }
           source = getStripeCopy(stripeIndex);
         }
@@ -803,6 +810,8 @@ public class StripeStoreFileManager
         throw new IOException("Newly created stripes do not cover the entire key space.");
       }
 
+      boolean canAddNewStripes = true;
+      Collection<StoreFile> filesForL0 = null;
       if (hasStripes) {
         // Determine which stripes will need to be removed because they conflict with new stripes.
         // The new boundaries should match old stripe boundaries, so we should get exact matches.
@@ -815,20 +824,49 @@ public class StripeStoreFileManager
         }
         int removeTo = findStripeIndexByEndRow(lastEndRow);
         if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
-        // Remove old empty stripes.
-        int originalCount = this.stripeFiles.size();
+        // See if there are files in the stripes we are trying to replace.
+        ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
         for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
-          if (!this.stripeFiles.get(removeIndex).isEmpty()) {
-            throw new IOException("Compaction intends to create a new stripe that replaces an"
-                + " existing one, but the latter contains some files.");
+          conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
+        }
+        if (!conflictingFiles.isEmpty()) {
+          // This can be caused by two things - concurrent flush into stripes, or a bug.
+          // Unfortunately, we cannot tell them apart without looking at timing or something
+          // like that. We will assume we are dealing with a flush and dump it into L0.
+          if (isFlush) {
+            long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
+            LOG.warn("Stripes were created by a flush, but results of size " + newSize
+                + " cannot be added because the stripes have changed");
+            canAddNewStripes = false;
+            filesForL0 = newStripes.values();
+          } else {
+            long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
+            LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
+                + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
+            filesForL0 = conflictingFiles;
           }
-          if (removeIndex != originalCount - 1) {
-            this.stripeEndRows.remove(removeIndex);
+          if (filesForL0 != null) {
+            for (StoreFile sf : filesForL0) {
+              insertFileIntoStripe(getLevel0Copy(), sf);
+            }
+            l0Results.addAll(filesForL0);
+          }
+        }
+
+        if (canAddNewStripes) {
+          // Remove old empty stripes.
+          int originalCount = this.stripeFiles.size();
+          for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
+            if (removeIndex != originalCount - 1) {
+              this.stripeEndRows.remove(removeIndex);
+            }
+            this.stripeFiles.remove(removeIndex);
           }
-          this.stripeFiles.remove(removeIndex);
         }
       }
 
+      if (!canAddNewStripes) return; // Files were already put into L0.
+
       // Now, insert new stripes. The total ranges match, so we can insert where we removed.
       byte[] previousEndRow = null;
       int insertAt = removeFrom;
@@ -838,7 +876,8 @@ public class StripeStoreFileManager
           assert !isOpen(previousEndRow);
           byte[] startRow = startOf(newStripe.getValue());
           if (!rowEquals(previousEndRow, startRow)) {
-            throw new IOException("The new stripes produced by compaction are not contiguous");
+            throw new IOException("The new stripes produced by "
+                + (isFlush ? "flush" : "compaction") + " are not contiguous");
           }
         }
         // Add the new stripe.

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java?rev=1539211&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java Wed Nov  6 01:49:36 2013
@@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
+ * into separate striped files, avoiding L0.
+ */
+public class StripeStoreFlusher extends StoreFlusher {
+  private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
+  private final Object flushLock = new Object();
+  private final StripeCompactionPolicy policy;
+  private final StripeCompactionPolicy.StripeInformationProvider stripes;
+
+  public StripeStoreFlusher(Configuration conf, Store store,
+      StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
+    super(conf, store);
+    this.policy = policy;
+    this.stripes = stripes;
+  }
+
+  @Override
+  public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
+      final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
+          throws IOException {
+    List<Path> result = null;
+    int kvCount = snapshot.size();
+    if (kvCount == 0) return result; // don't flush if there are no entries
+
+    long smallestReadPoint = store.getSmallestReadPoint();
+    InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+    if (scanner == null) {
+      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+    }
+
+    // Let policy select flush method.
+    StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+
+    long flushedBytes = 0;
+    boolean success = false;
+    StripeMultiFileWriter mw = null;
+    try {
+      mw = req.createWriter(); // Writer according to the policy.
+      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
+      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
+      mw.init(storeScanner, factory, store.getComparator());
+
+      synchronized (flushLock) {
+        flushedBytes = performFlush(scanner, mw, smallestReadPoint);
+        result = mw.commitWriters(cacheFlushSeqNum, false);
+        success = true;
+      }
+    } finally {
+      if (!success && (mw != null)) {
+        result.clear();
+        for (Path leftoverFile : mw.abortWriters()) {
+          try {
+            store.getFileSystem().delete(leftoverFile, false);
+          } catch (Exception e) {
+            LOG.error("Failed to delete a file after failed flush: " + e);
+          }
+        }
+      }
+      flushedSize.set(flushedBytes);
+      try {
+        scanner.close();
+      } catch (IOException ex) {
+        LOG.warn("Failed to close flush scanner, ignoring", ex);
+      }
+    }
+    return result;
+  }
+
+  private StripeMultiFileWriter.WriterFactory createWriterFactory(
+      final TimeRangeTracker tracker, final long kvCount) {
+    return new StripeMultiFileWriter.WriterFactory() {
+      @Override
+      public Writer createWriter() throws IOException {
+        StoreFile.Writer writer = store.createWriterInTmp(
+            kvCount, store.getFamily().getCompression(), false, true, true);
+        writer.setTimeRangeTracker(tracker);
+        return writer;
+      }
+    };
+  }
+
+  /** Stripe flush request wrapper that writes a non-striped file. */
+  public static class StripeFlushRequest {
+    @VisibleForTesting
+    public StripeMultiFileWriter createWriter() throws IOException {
+      StripeMultiFileWriter writer =
+          new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
+      writer.setNoStripeMetadata();
+      return writer;
+    }
+  }
+
+  /** Stripe flush request wrapper based on boundaries. */
+  public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
+    private final List<byte[]> targetBoundaries;
+
+    /** @param targetBoundaries New files should be written with these boundaries. */
+    public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
+      this.targetBoundaries = targetBoundaries;
+    }
+
+    @Override
+    public StripeMultiFileWriter createWriter() throws IOException {
+      return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
+    }
+  }
+
+  /** Stripe flush request wrapper based on size. */
+  public static class SizeStripeFlushRequest extends StripeFlushRequest {
+    private final int targetCount;
+    private final long targetKvs;
+
+    /**
+     * @param targetCount The maximum number of stripes to flush into.
+     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
+     *                  total number of kvs, all the overflow data goes into the last stripe.
+     */
+    public SizeStripeFlushRequest(int targetCount, long targetKvs) {
+      this.targetCount = targetCount;
+      this.targetKvs = targetKvs;
+    }
+
+    @Override
+    public StripeMultiFileWriter createWriter() throws IOException {
+      return new StripeMultiFileWriter.SizeMultiWriter(
+          this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java Wed Nov  6 01:49:36 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ConcatenatedLists;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -75,9 +76,24 @@ public class StripeCompactionPolicy exte
     if (si.getStripeCount() > 0) {
       return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
     }
-    int initialCount = this.config.getInitialCount();
-    long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
-    return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
+    Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
+        request.getFiles(), this.config.getInitialCount());
+    return new SplitStripeCompactionRequest(
+        request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
+  }
+
+  public StripeStoreFlusher.StripeFlushRequest selectFlush(
+      StripeInformationProvider si, int kvCount) {
+    if (this.config.isUsingL0Flush()) {
+      return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
+    }
+    if (si.getStripeCount() == 0) {
+      // No stripes - start with the requisite count, derive KVs per stripe.
+      int initialCount = this.config.getInitialCount();
+      return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
+    }
+    // There are stripes - do according to the boundaries.
+    return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
   }
 
   public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
@@ -341,7 +357,7 @@ public class StripeCompactionPolicy exte
     return totalSize;
   }
 
-  private static long getTotalFileSize(final Collection<StoreFile> candidates) {
+  public static long getTotalFileSize(final Collection<StoreFile> candidates) {
     long totalSize = 0;
     for (StoreFile storeFile : candidates) {
       totalSize += storeFile.getReader().length();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java Wed Nov  6 01:49:36 2013
@@ -125,7 +125,7 @@ public class TestStripeCompactor {
     StripeCompactor sc = createCompactor(writers, input);
     List<Path> paths =
         sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
-    writers.verifyKvs(output, allFiles);
+    writers.verifyKvs(output, allFiles, true);
     if (allFiles) {
       assertEquals(output.length, paths.size());
       writers.verifyBoundaries(boundaries);
@@ -162,7 +162,7 @@ public class TestStripeCompactor {
     List<Path> paths = sc.compact(
         createDummyRequest(), targetCount, targetSize, left, right, null, null);
     assertEquals(output.length, paths.size());
-    writers.verifyKvs(output, true);
+    writers.verifyKvs(output, true, true);
     List<byte[]> boundaries = new ArrayList<byte[]>();
     boundaries.add(left);
     for (int i = 1; i < output.length; ++i) {
@@ -242,7 +242,8 @@ public class TestStripeCompactor {
   }
 
   // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
-  private static class StoreFileWritersCapture implements Answer<StoreFile.Writer> {
+  public static class StoreFileWritersCapture implements
+    Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
     public static class Writer {
       public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
       public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -251,7 +252,7 @@ public class TestStripeCompactor {
     private List<Writer> writers = new ArrayList<Writer>();
 
     @Override
-    public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
+    public StoreFile.Writer createWriter() throws IOException {
       final Writer realWriter = new Writer();
       writers.add(realWriter);
       StoreFile.Writer writer = mock(StoreFile.Writer.class);
@@ -267,7 +268,12 @@ public class TestStripeCompactor {
       return writer;
     }
 
-    public void verifyKvs(KeyValue[][] kvss, boolean allFiles) {
+    @Override
+    public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
+      return createWriter();
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
       if (allFiles) {
         assertEquals(kvss.length, writers.size());
       }
@@ -276,8 +282,13 @@ public class TestStripeCompactor {
         KeyValue[] kvs = kvss[i];
         if (kvs != null) {
           Writer w = writers.get(i - skippedWriters);
-          assertNotNull(w.data.get(STRIPE_START_KEY));
-          assertNotNull(w.data.get(STRIPE_END_KEY));
+          if (requireMetadata) {
+            assertNotNull(w.data.get(STRIPE_START_KEY));
+            assertNotNull(w.data.get(STRIPE_END_KEY));
+          } else {
+            assertNull(w.data.get(STRIPE_START_KEY));
+            assertNull(w.data.get(STRIPE_END_KEY));
+          }
           assertEquals(kvs.length, w.kvs.size());
           for (int j = 0; j < kvs.length; ++j) {
             assertEquals(kvs[j], w.kvs.get(j));

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java Wed Nov  6 01:49:36 2013
@@ -83,7 +83,7 @@ public class TestStripeStoreFileManager 
   public void testInsertFilesIntoL0() throws Exception {
     StripeStoreFileManager manager = createManager();
     MockStoreFile sf = createFile();
-    manager.insertNewFile(sf);
+    manager.insertNewFiles(al(sf));
     assertEquals(1, manager.getStorefileCount());
     Collection<StoreFile> filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A);
     assertEquals(1, filesForGet.size());
@@ -99,8 +99,8 @@ public class TestStripeStoreFileManager 
   @Test
   public void testClearFiles() throws Exception {
     StripeStoreFileManager manager = createManager();
-    manager.insertNewFile(createFile());
-    manager.insertNewFile(createFile());
+    manager.insertNewFiles(al(createFile()));
+    manager.insertNewFiles(al(createFile()));
     manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B),
         createFile(KEY_B, OPEN_KEY)));
     assertEquals(4, manager.getStorefileCount());
@@ -120,8 +120,8 @@ public class TestStripeStoreFileManager 
   public void testRowKeyBefore() throws Exception {
     StripeStoreFileManager manager = createManager();
     StoreFile l0File = createFile(), l0File2 = createFile();
-    manager.insertNewFile(l0File);
-    manager.insertNewFile(l0File2);
+    manager.insertNewFiles(al(l0File));
+    manager.insertNewFiles(al(l0File2));
     // Get candidate files.
     Iterator<StoreFile> sfs = manager.getCandidateFilesForRowKeyBefore(KV_B);
     sfs.next();
@@ -174,8 +174,8 @@ public class TestStripeStoreFileManager 
     // If there are no stripes, should pick midpoint from the biggest file in L0.
     MockStoreFile sf5 = createFile(5, 0);
     sf5.splitPoint = new byte[1];
-    manager.insertNewFile(sf5);
-    manager.insertNewFile(createFile(1, 0));
+    manager.insertNewFiles(al(sf5));
+    manager.insertNewFiles(al(createFile(1, 0)));
     assertEquals(sf5.splitPoint, manager.getSplitPoint());
 
     // Same if there's one stripe but the biggest file is still in L0.
@@ -259,7 +259,7 @@ public class TestStripeStoreFileManager 
 
     // Populate one L0 file.
     MockStoreFile sf0 = createFile();
-    manager.insertNewFile(sf0);
+    manager.insertNewFiles(al(sf0));
     verifyGetAndScanScenario(manager, null, null,   sf0);
     verifyGetAndScanScenario(manager, null, KEY_C,  sf0);
     verifyGetAndScanScenario(manager, KEY_B, null,  sf0);
@@ -356,14 +356,11 @@ public class TestStripeStoreFileManager 
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testAddingCompactionResults() throws Exception {
     StripeStoreFileManager manager = createManager();
     // First, add some L0 files and "compact" one with new stripe creation.
-    StoreFile sf_L0_0a = createFile();
-    StoreFile sf_L0_0b = createFile();
-    manager.insertNewFile(sf_L0_0a);
-    manager.insertNewFile(sf_L0_0b);
+    StoreFile sf_L0_0a = createFile(), sf_L0_0b = createFile();
+    manager.insertNewFiles(al(sf_L0_0a, sf_L0_0b));
 
     // Try compacting with invalid new branches (gaps, overlaps) - no effect.
     verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B)));
@@ -384,7 +381,7 @@ public class TestStripeStoreFileManager 
     StoreFile sf_L0_1 = createFile();
     StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B);
     StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
-    manager.insertNewFile(sf_L0_1);
+    manager.insertNewFiles(al(sf_L0_1));
     manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
     verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
 
@@ -400,27 +397,21 @@ public class TestStripeStoreFileManager 
     manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
     verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
 
-    // Try to rebalance two stripes, but don't take all files from them - no effect.
+    // Rebalance two stripes.
     StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
     StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
-    ArrayList<StoreFile> compacted3 = al();
-    verifyInvalidCompactionScenario(manager, al(sf_B2C_0, sf_C2i_0), al(sf_B2D_4, sf_D2i_4));
-
-    // Rebalance two stripes correctly.
     manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
     verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
 
     // Split the first stripe.
     StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
     StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
-    ArrayList<StoreFile> compacted4 = al(createFile(OPEN_KEY, KEY_A), createFile(KEY_A, KEY_B));
     manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
     verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
 
     // Split the middle stripe.
     StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
     StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
-    ArrayList<StoreFile> compacted5 = al(createFile(KEY_B, KEY_C), createFile(KEY_C, KEY_D));
     manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
 
@@ -429,14 +420,6 @@ public class TestStripeStoreFileManager 
     manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
 
-    // Try various range mismatch cases in replaced and new data - no effect.
-    ArrayList<StoreFile> tmp = al(sf_A2C_7, sf_C2D_6); // [A, C)
-    verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, KEY_C)));
-    verifyInvalidCompactionScenario(manager, tmp, al(createFile(OPEN_KEY, KEY_D)));
-    verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, OPEN_KEY)));
-    verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, KEY_B)));
-    verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, keyAfter(KEY_B))));
-
     // Merge lower half.
     StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
     manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
@@ -449,13 +432,39 @@ public class TestStripeStoreFileManager 
   }
 
   @Test
+  public void testCompactionAndFlushConflict() throws Exception {
+    // Add file flush into stripes
+    StripeStoreFileManager sfm = createManager();
+    assertEquals(0, sfm.getStripeCount());
+    StoreFile sf_i2c = createFile(OPEN_KEY, KEY_C), sf_c2i = createFile(KEY_C, OPEN_KEY);
+    sfm.insertNewFiles(al(sf_i2c, sf_c2i));
+    assertEquals(2, sfm.getStripeCount());
+    // Now try to add conflicting flush - should throw.
+    StoreFile sf_i2d = createFile(OPEN_KEY, KEY_D), sf_d2i = createFile(KEY_D, OPEN_KEY);
+    sfm.insertNewFiles(al(sf_i2d, sf_d2i));
+    assertEquals(2, sfm.getStripeCount());
+    assertEquals(2, sfm.getLevel0Files().size());
+    verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
+    // Remove these files.
+    sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
+    assertEquals(0, sfm.getLevel0Files().size());
+    // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
+    // presumably flushed during compaction, should go to L0.
+    StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
+    sfm.insertNewFiles(al(sf_i2c_2));
+    sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
+    assertEquals(1, sfm.getLevel0Files().size());
+    verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
+  }
+
+  @Test
   public void testEmptyResultsForStripes() throws Exception {
     // Test that we can compact L0 into a subset of stripes.
     StripeStoreFileManager manager = createManager();
     StoreFile sf0a = createFile();
     StoreFile sf0b = createFile();
-    manager.insertNewFile(sf0a);
-    manager.insertNewFile(sf0b);
+    manager.insertNewFiles(al(sf0a));
+    manager.insertNewFiles(al(sf0b));
     ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
         createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
     manager.addCompactionResults(al(sf0a), compacted);
@@ -491,7 +500,7 @@ public class TestStripeStoreFileManager 
     conf.setInt("hbase.hstore.blockingStoreFiles", limit);
     StripeStoreFileManager sfm = createManager(al(), conf);
     for (int i = 0; i < l0Files; ++i) {
-      sfm.insertNewFile(createFile());
+      sfm.insertNewFiles(al(createFile()));
     }
     for (int i = 0; i < filesInStripe; ++i) {
       ArrayList<StoreFile> stripe = new ArrayList<StoreFile>();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java Wed Nov  6 01:49:36 2013
@@ -38,16 +38,20 @@ import org.apache.commons.lang.NotImplem
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
+import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ConcatenatedLists;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.ArgumentMatcher;
@@ -62,6 +66,12 @@ public class TestStripeCompactionPolicy 
   private static final byte[] KEY_C = Bytes.toBytes("ccc");
   private static final byte[] KEY_D = Bytes.toBytes("ddd");
   private static final byte[] KEY_E = Bytes.toBytes("eee");
+  private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
+  private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
+  private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
+  private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
+  private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
+
 
   private static long defaultSplitSize = 18;
   private static float defaultSplitCount = 1.8F;
@@ -69,6 +79,39 @@ public class TestStripeCompactionPolicy 
   private static long defaultTtl = 1000 * 1000;
 
   @Test
+  public void testNoStripesFromFlush() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
+    StripeCompactionPolicy policy = createPolicy(conf);
+    StripeInformationProvider si = createStripesL0Only(0, 0);
+
+    KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
+    KeyValue[][] expected = new KeyValue[][] { input };
+    verifyFlush(policy, si, input, expected, null);
+  }
+
+  @Test
+  public void testOldStripesFromFlush() throws Exception {
+    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
+    StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
+
+    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
+    KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
+        new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
+    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
+  }
+
+  @Test
+  public void testNewStripesFromFlush() throws Exception {
+    StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
+    StripeInformationProvider si = createStripesL0Only(0, 0);
+    KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
+    // Starts with one stripe; unlike flush results, must have metadata
+    KeyValue[][] expected = new KeyValue[][] { input };
+    verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
+  }
+
+  @Test
   public void testSingleStripeCompaction() throws Exception {
     // Create a special policy that only compacts single stripes, using standard methods.
     Configuration conf = HBaseConfiguration.create();
@@ -424,6 +467,25 @@ public class TestStripeCompactionPolicy 
         dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
   }
 
+  /** Verify arbitrary flush. */
+  protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
+      KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
+    StripeMultiFileWriter mw = req.createWriter();
+    mw.init(null, writers, new KeyValue.KVComparator());
+    for (KeyValue kv : input) {
+      mw.append(kv);
+    }
+    boolean hasMetadata = boundaries != null;
+    mw.commitWriters(0, false);
+    writers.verifyKvs(expected, true, hasMetadata);
+    if (hasMetadata) {
+      writers.verifyBoundaries(boundaries);
+    }
+  }
+
+
   private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
     return dropDeletes == null ? any(byte[].class)
             : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));