You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/10/29 01:34:13 UTC

svn commit: r1536567 [1/2] - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-common/src/test/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/ha...

Author: sershe
Date: Tue Oct 29 00:34:12 2013
New Revision: 1536567

URL: http://svn.apache.org/r1536567
Log:
HBASE-7679 implement store file management for stripe compactions

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcatenatedLists.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestConcatenatedLists.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1536567&r1=1536566&r2=1536567&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 29 00:34:12 2013
@@ -2345,10 +2345,25 @@ public class KeyValue implements Cell, H
     private boolean matchingRows(final KeyValue left, final short lrowlength,
         final KeyValue right, final short rrowlength) {
       return lrowlength == rrowlength &&
-          Bytes.equals(left.getBuffer(), left.getRowOffset(), lrowlength,
+          matchingRows(left.getBuffer(), left.getRowOffset(), lrowlength,
               right.getBuffer(), right.getRowOffset(), rrowlength);
     }
 
+    /**
+     * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
+     * @param left Left row array.
+     * @param loffset Left row offset.
+     * @param llength Left row length.
+     * @param right Right row array.
+     * @param roffset Right row offset.
+     * @param rlength Right row length.
+     * @return Whether rows are the same row.
+     */
+    public boolean matchingRows(final byte [] left, final int loffset, final int llength,
+        final byte [] right, final int roffset, final int rlength) {
+      return Bytes.equals(left, loffset, llength, right, roffset, rlength);
+    }
+
     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcatenatedLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcatenatedLists.java?rev=1536567&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcatenatedLists.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcatenatedLists.java Tue Oct 29 00:34:12 2013
@@ -0,0 +1,166 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A collection class that contains multiple sub-lists, which allows us to not copy lists.
+ * This class does not support modification. The derived classes that add modifications are
+ * not thread-safe.
+ * NOTE: Doesn't implement list as it is not necessary for current usage, feel free to add.
+ */
+public class ConcatenatedLists<T> implements Collection<T> {
+  protected final ArrayList<List<T>> components = new ArrayList<List<T>>();
+  protected int size = 0;
+
+  public void addAllSublists(List<? extends List<T>> items) {
+    for (List<T> list : items) {
+      addSublist(list);
+    }
+  }
+
+  public void addSublist(List<T> items) {
+    if (!items.isEmpty()) {
+      this.components.add(items);
+      this.size += items.size();
+    }
+  }
+
+  @Override
+  public int size() {
+    return this.size;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return this.size == 0;
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    for (List<T> component : this.components) {
+      if (component.contains(o)) return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    for (Object o : c) {
+      if (!contains(o)) return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Object[] toArray() {
+    return toArray((Object[])Array.newInstance(Object.class, this.size));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <U> U[] toArray(U[] a) {
+    U[] result = (a.length == this.size()) ? a
+        : (U[])Array.newInstance(a.getClass().getComponentType(), this.size);
+    int i = 0;
+    for (List<T> component : this.components) {
+      for (T t : component) {
+        result[i] = (U)t;
+        ++i;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public boolean add(T e) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> c) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public java.util.Iterator<T> iterator() {
+    return new Iterator();
+  }
+
+  public class Iterator implements java.util.Iterator<T> {
+    protected int currentComponent = 0;
+    protected int indexWithinComponent = -1;
+    protected boolean nextWasCalled = false;
+
+    @Override
+    public boolean hasNext() {
+      return (currentComponent + 1) < components.size()
+          || ((currentComponent + 1) == components.size()
+              && ((indexWithinComponent + 1) < components.get(currentComponent).size()));
+    }
+
+    @Override
+    public T next() {
+      if (!components.isEmpty()) {
+        this.nextWasCalled = true;
+        List<T> src = components.get(currentComponent);
+        if (++indexWithinComponent < src.size()) return src.get(indexWithinComponent);
+        if (++currentComponent < components.size()) {
+          indexWithinComponent = 0;
+          src = components.get(currentComponent);
+          assert src.size() > 0;
+          return src.get(indexWithinComponent);
+        }
+      }
+      this.nextWasCalled = false;
+      throw new NoSuchElementException();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestConcatenatedLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestConcatenatedLists.java?rev=1536567&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestConcatenatedLists.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestConcatenatedLists.java Tue Oct 29 00:34:12 2013
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+@Category(SmallTests.class)
+public class TestConcatenatedLists {
+  @Test
+  public void testUnsupportedOps() {
+    // If adding support, add tests.
+    ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
+    c.addSublist(Arrays.asList(0L, 1L));
+    try {
+      c.add(2L);
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    try {
+      c.addAll(Arrays.asList(2L, 3L));
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    try {
+      c.remove(0L);
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    try {
+      c.removeAll(Arrays.asList(0L, 1L));
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    try {
+      c.clear();
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    try {
+      c.retainAll(Arrays.asList(0L, 1L));
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+    Iterator<Long> iter = c.iterator();
+    iter.next();
+    try {
+      iter.remove();
+      fail("Should throw");
+    } catch (UnsupportedOperationException ex) {
+    }
+  }
+
+  @Test
+  public void testEmpty() {
+    verify(new ConcatenatedLists<Long>(), -1);
+  }
+
+  @Test
+  public void testOneOne() {
+    ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
+    c.addSublist(Arrays.asList(0L));
+    verify(c, 0);
+  }
+
+  @Test
+  public void testOneMany() {
+    ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
+    c.addSublist(Arrays.asList(0L, 1L, 2L));
+    verify(c, 2);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testManyOne() {
+    ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
+    c.addSublist(Arrays.asList(0L));
+    c.addAllSublists(Arrays.asList(Arrays.asList(1L), Arrays.asList(2L)));
+    verify(c, 2);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testManyMany() {
+    ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
+    c.addAllSublists(Arrays.asList(Arrays.asList(0L, 1L)));
+    c.addSublist(Arrays.asList(2L, 3L, 4L));
+    c.addAllSublists(Arrays.asList(Arrays.asList(5L), Arrays.asList(6L, 7L)));
+    verify(c, 7);
+  }
+
+  private void verify(ConcatenatedLists<Long> c, int last) {
+    assertEquals((last == -1), c.isEmpty());
+    assertEquals(last + 1, c.size());
+    assertTrue(c.containsAll(c));
+    Long[] array = c.toArray(new Long[0]);
+    List<Long> all = new ArrayList<Long>();
+    Iterator<Long> iter = c.iterator();
+    for (Long i = 0L; i <= last; ++i) {
+      assertTrue(iter.hasNext());
+      assertEquals(i, iter.next());
+      assertEquals(i, array[i.intValue()]);
+      assertTrue(c.contains(i));
+      assertTrue(c.containsAll(Arrays.asList(i)));
+      all.add(i);
+    }
+    assertTrue(c.containsAll(all));
+    assertFalse(iter.hasNext());
+    try {
+      iter.next();
+      fail("Should have thrown");
+    } catch (NoSuchElementException nsee) {
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1536567&r1=1536566&r2=1536567&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 29 00:34:12 2013
@@ -285,6 +285,10 @@ public class StoreFile {
     return modificationTimeStamp;
   }
 
+  byte[] getMetadataValue(byte[] key) {
+    return metadataMap.get(key);
+  }
+
   /**
    * Return the largest memstoreTS found across all storefiles in
    * the given list. Store files that were created by a mapreduce

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1536567&r1=1536566&r2=1536567&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Tue Oct 29 00:34:12 2013
@@ -58,8 +58,7 @@ public interface StoreFileManager {
    * @param results The resulting files for the compaction.
    */
   void addCompactionResults(
-    Collection<StoreFile> compactedFiles, Collection<StoreFile> results
-  );
+      Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
 
   /**
    * Clears all the files currently in use and returns them.

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java?rev=1536567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java Tue Oct 29 00:34:12 2013
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Configuration class for stripe store and compactions.
+ * See {@link StripeStoreFileManager} for general documentation.
+ * See getters for the description of each setting.
+ */
+@InterfaceAudience.Private
+public class StripeStoreConfig {
+  public static final String MAX_SPLIT_IMBALANCE = "hbase.store.stripe.split.max.imbalance";
+  private float maxSplitImbalance;
+
+  public StripeStoreConfig(Configuration config) {
+    maxSplitImbalance = config.getFloat(MAX_SPLIT_IMBALANCE, 1.5f);
+    if (maxSplitImbalance == 0) {
+      maxSplitImbalance = 1.5f;
+    }
+    if (maxSplitImbalance < 1f) {
+      maxSplitImbalance = 1f / maxSplitImbalance;
+    }
+  }
+
+  /**
+   * @return the maximum imbalance to tolerate between sides when splitting the region
+   * at the stripe boundary. If the ratio of a larger to a smaller side of the split on
+   * the stripe-boundary is bigger than this, then some stripe will be split.
+   */
+  public float getMaxSplitImbalance() {
+    return this.maxSplitImbalance;
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1536567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Tue Oct 29 00:34:12 2013
@@ -0,0 +1,862 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcatenatedLists;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Stripe implementation of StoreFileManager.
+ * Not thread safe - relies on external locking (in HStore). Collections that this class
+ * returns are immutable or unique to the call, so they should be safe.
+ * Stripe store splits the key space of the region into non-overlapping stripes, as well as
+ * some recent files that have all the keys (level 0). Each stripe contains a set of files.
+ * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
+ * that can thus be added to stripes.
+ * When scan or get happens, it only has to read the files from the corresponding stripes.
+ * See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
+ *
+ * This class should work together with StripeCompactionPolicy and StripeCompactor.
+ * With regard to how they work, we make at least the following (reasonable) assumptions:
+ *  - Compaction produces one file per new stripe (if any); that is easy to change.
+ *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
+ */
+@InterfaceAudience.Private
+class StripeStoreFileManager implements StoreFileManager {
+  static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
+
+  /**
+   * The file metadata fields that contain the stripe information.
+   */
+  public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
+  public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
+
+  private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
+
+  /**
+   * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
+   */
+  public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
+  final static byte[] INVALID_KEY = null;
+
+  /**
+   * The state class. Used solely to replace results atomically during
+   * compactions and avoid complicated error handling.
+   */
+  private static class State {
+    /**
+     * The end keys of each stripe. The last stripe end is always open-ended, so it's not stored
+     * here. It is invariant that the start key of the stripe is the end key of the previous one
+     * (and is an open boundary for the first one).
+     */
+    public byte[][] stripeEndRows = new byte[0][];
+
+    /**
+     * Files by stripe. Each element of the list corresponds to stripeEndKey with the corresponding
+     * index, except the last one. Inside each list, the files are in reverse order by seqNum.
+     * Note that the length of this is one higher than that of stripeEndKeys.
+     */
+    public ArrayList<ImmutableList<StoreFile>> stripeFiles
+      = new ArrayList<ImmutableList<StoreFile>>();
+    /** Level 0. The files are in reverse order by seqNum. */
+    public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
+
+    /** Cached list of all files in the structure, to return from some calls */
+    public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
+  }
+  private State state = null;
+
+  /** Cached file metadata (or overrides as the case may be) */
+  private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
+  private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
+
+  private final KVComparator kvComparator;
+  private StripeStoreConfig config;
+
+  private final int blockingFileCount;
+
+  public StripeStoreFileManager(KVComparator kvComparator, Configuration conf) throws Exception {
+    this.kvComparator = kvComparator;
+    // TODO: create this in a shared manner in StoreEngine when there's one
+    this.config = new StripeStoreConfig(conf);
+    this.blockingFileCount = conf.getInt(
+        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
+  }
+
+  @Override
+  public void loadFiles(List<StoreFile> storeFiles) {
+    loadUnclassifiedStoreFiles(storeFiles);
+  }
+
+  @Override
+  public Collection<StoreFile> getStorefiles() {
+    return state.allFilesCached;
+  }
+
+  @Override
+  public void insertNewFile(StoreFile sf) {
+    LOG.debug("New level 0 file: " + sf);
+    ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(state.level0Files);
+    insertFileIntoStripe(newFiles, sf);
+    ensureLevel0Metadata(sf);
+    this.state.level0Files = ImmutableList.copyOf(newFiles);
+    ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(state.allFilesCached);
+    newAllFiles.add(sf);
+    this.state.allFilesCached = ImmutableList.copyOf(newAllFiles);
+  }
+
+  @Override
+  public ImmutableCollection<StoreFile> clearFiles() {
+    ImmutableCollection<StoreFile> result = state.allFilesCached;
+    this.state = new State();
+    this.fileStarts.clear();
+    this.fileEnds.clear();
+    return result;
+  }
+
+  @Override
+  public int getStorefileCount() {
+    return state.allFilesCached.size();
+  }
+
+  /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
+   * for details on this methods. */
+  @Override
+  public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
+    KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
+    // Order matters for this call.
+    result.addSublist(state.level0Files);
+    if (!state.stripeFiles.isEmpty()) {
+      int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
+      for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
+        result.addSublist(state.stripeFiles.get(stripeIndex));
+      }
+    }
+    return result.iterator();
+  }
+
+  /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
+   * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, KeyValue)}
+   * for details on this methods. */
+  @Override
+  public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
+      Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
+    KeyBeforeConcatenatedLists.Iterator original =
+        (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
+    assert original != null;
+    ArrayList<List<StoreFile>> components = original.getComponents();
+    for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
+      StoreFile sf = components.get(firstIrrelevant).get(0);
+      byte[] endKey = endOf(sf);
+      // Entries are ordered as such: L0, then stripes in reverse order. We never remove
+      // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
+      // first one that cannot possibly have better candidates.
+      if (!isInvalid(endKey) && !isOpen(endKey)
+          && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
+        original.removeComponents(firstIrrelevant);
+        break;
+      }
+    }
+    return original;
+  }
+
+  @Override
+  /**
+   * Override of getSplitPoint that determines the split point as the boundary between two
+   * stripes, unless it causes significant imbalance between split sides' sizes. In that
+   * case, the split boundary will be chosen from the middle of one of the stripes to
+   * minimize imbalance.
+   * @return The split point, or null if no split is possible.
+   */
+  public byte[] getSplitPoint() throws IOException {
+    if (this.getStorefileCount() == 0) return null;
+    if (state.stripeFiles.size() <= 1) {
+      return getSplitPointFromAllFiles();
+    }
+    int leftIndex = -1, rightIndex = state.stripeFiles.size();
+    long leftSize = 0, rightSize = 0;
+    long lastLeftSize = 0, lastRightSize = 0;
+    while (rightIndex - 1 != leftIndex) {
+      if (leftSize >= rightSize) {
+        --rightIndex;
+        lastRightSize = getStripeFilesSize(rightIndex);
+        rightSize += lastRightSize;
+      } else {
+        ++leftIndex;
+        lastLeftSize = getStripeFilesSize(leftIndex);
+        leftSize += lastLeftSize;
+      }
+    }
+    if (leftSize == 0 || rightSize == 0) {
+      String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
+          + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
+      debugDumpState(errMsg);
+      LOG.warn(errMsg);
+      return getSplitPointFromAllFiles();
+    }
+    double ratio = (double)rightSize / leftSize;
+    if (ratio < 1) {
+      ratio = 1 / ratio;
+    }
+    if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
+
+    // If the difference between the sides is too large, we could get the proportional key on
+    // the a stripe to equalize the difference, but there's no proportional key method at the
+    // moment, and it's not extremely important.
+    // See if we can achieve better ratio if we split the bigger side in half.
+    boolean isRightLarger = rightSize >= leftSize;
+    double newRatio = isRightLarger
+        ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
+        : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
+    if (newRatio < 1) {
+      newRatio = 1 / newRatio;
+    }
+    if (newRatio >= ratio)  return state.stripeEndRows[leftIndex];
+    LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
+        + newRatio + " configured ratio " + config.getMaxSplitImbalance());
+    // Ok, we may get better ratio, get it.
+    return StoreUtils.getLargestFile(state.stripeFiles.get(
+        isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
+  }
+
+  private byte[] getSplitPointFromAllFiles() throws IOException {
+    ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
+    sfs.addSublist(state.level0Files);
+    sfs.addAllSublists(state.stripeFiles);
+    if (sfs.isEmpty()) return null;
+    return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
+  }
+
+  private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
+    return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
+  }
+
+  @Override
+  public Collection<StoreFile> getFilesForScanOrGet(
+      boolean isGet, byte[] startRow, byte[] stopRow) {
+    if (state.stripeFiles.isEmpty()) {
+      return state.level0Files; // There's just L0.
+    }
+
+    int firstStripe = findStripeForRow(startRow, true);
+    int lastStripe = findStripeForRow(stopRow, false);
+    assert firstStripe <= lastStripe;
+    if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
+      return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
+    }
+    if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
+      return state.allFilesCached; // We need to read all files.
+    }
+
+    ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
+    result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
+    result.addSublist(state.level0Files);
+    return result;
+  }
+
+  @Override
+  public void addCompactionResults(
+    Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
+    // See class comment for the assumptions we make here.
+    LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
+        + " files replaced by " + results.size());
+    // In order to be able to fail in the middle of the operation, we'll operate on lazy
+    // copies and apply the result at the end.
+    CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy();
+    cmc.mergeResults(compactedFiles, results);
+    debugDumpState("Merged compaction results");
+  }
+
+  @Override
+  public int getStoreCompactionPriority() {
+    // If there's only L0, do what the default store does.
+    // If we are in critical priority, do the same - we don't want to trump all stores all
+    // the time due to how many files we have.
+    int fc = getStorefileCount();
+    if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
+      return this.blockingFileCount - fc;
+    }
+    // If we are in good shape, we don't want to be trumped by all other stores due to how
+    // many files we have, so do an approximate mapping to normal priority range; L0 counts
+    // for all stripes.
+    int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
+    int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
+    return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
+  }
+
+  /**
+   * Gets the total size of all files in the stripe.
+   * @param stripeIndex Stripe index.
+   * @return Size.
+   */
+  private long getStripeFilesSize(int stripeIndex) {
+    long result = 0;
+    for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
+      result += sf.getReader().length();
+    }
+    return result;
+  }
+
+  /**
+   * Loads initial store files that were picked up from some physical location pertaining to
+   * this store (presumably). Unlike adding files after compaction, assumes empty initial
+   * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
+   * go to level 0.
+   * @param storeFiles Store files to add.
+   */
+  private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
+    LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
+    TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
+        new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
+    ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
+    // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
+    // If needed, we could dynamically determine the stripes in future.
+    for (StoreFile sf : storeFiles) {
+      byte[] startRow = startOf(sf), endRow = endOf(sf);
+      // Validate the range and put the files into place.
+      if (isInvalid(startRow) || isInvalid(endRow)) {
+        insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
+        ensureLevel0Metadata(sf);
+      } else if (!isOpen(startRow) && !isOpen(endRow) &&
+          nonOpenRowCompare(startRow, endRow) >= 0) {
+        LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
+          + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
+        insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
+        ensureLevel0Metadata(sf);
+      } else {
+        ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
+        if (stripe == null) {
+          stripe = new ArrayList<StoreFile>();
+          candidateStripes.put(endRow, stripe);
+        }
+        insertFileIntoStripe(stripe, sf);
+      }
+    }
+    // Possible improvement - for variable-count stripes, if all the files are in L0, we can
+    // instead create single, open-ended stripe with all files.
+
+    boolean hasOverlaps = false;
+    byte[] expectedStartRow = null; // first stripe can start wherever
+    Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
+        candidateStripes.entrySet().iterator();
+    while (entryIter.hasNext()) {
+      Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
+      ArrayList<StoreFile> files = entry.getValue();
+      // Validate the file start keys, and remove the bad ones to level 0.
+      for (int i = 0; i < files.size(); ++i) {
+        StoreFile sf = files.get(i);
+        byte[] startRow = startOf(sf);
+        if (expectedStartRow == null) {
+          expectedStartRow = startRow; // ensure that first stripe is still consistent
+        } else if (!rowEquals(expectedStartRow, startRow)) {
+          hasOverlaps = true;
+          LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
+              + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
+              + "], to L0 it goes");
+          StoreFile badSf = files.remove(i);
+          insertFileIntoStripe(level0Files, badSf);
+          ensureLevel0Metadata(badSf);
+          --i;
+        }
+      }
+      // Check if any files from the candidate stripe are valid. If so, add a stripe.
+      byte[] endRow = entry.getKey();
+      if (!files.isEmpty()) {
+        expectedStartRow = endRow; // Next stripe must start exactly at that key.
+      } else {
+        entryIter.remove();
+      }
+    }
+
+    // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
+    // files are consistent, they might be coming from a split. We will treat the boundaries
+    // as open keys anyway, and log the message.
+    // If there were errors, we'll play it safe and dump everything into L0.
+    if (!candidateStripes.isEmpty()) {
+      StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
+      boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
+      if (!isOpen) {
+        LOG.warn("The range of the loaded files does not cover full key space: from ["
+            + Bytes.toString(startOf(firstFile)) + "], to ["
+            + Bytes.toString(candidateStripes.lastKey()) + "]");
+        if (!hasOverlaps) {
+          ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
+          ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
+        } else {
+          LOG.warn("Inconsistent files, everything goes to L0.");
+          for (ArrayList<StoreFile> files : candidateStripes.values()) {
+            for (StoreFile sf : files) {
+              insertFileIntoStripe(level0Files, sf);
+              ensureLevel0Metadata(sf);
+            }
+          }
+          candidateStripes.clear();
+        }
+      }
+    }
+
+    // Copy the results into the fields.
+    State state = new State();
+    state.level0Files = ImmutableList.copyOf(level0Files);
+    state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
+    state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
+    ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
+    int i = candidateStripes.size() - 1;
+    for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
+      state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
+      newAllFiles.addAll(entry.getValue());
+      if (i > 0) {
+        state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
+      }
+      --i;
+    }
+    state.allFilesCached = ImmutableList.copyOf(newAllFiles);
+    this.state = state;
+    debugDumpState("Files loaded");
+  }
+
+  private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
+    HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
+    for (StoreFile sf : stripe) {
+      targetMap.put(sf, OPEN_KEY);
+    }
+  }
+
+  private void ensureLevel0Metadata(StoreFile sf) {
+    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, null);
+    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, null);
+  }
+
+  /**
+   * For testing.
+   */
+  List<StoreFile> getLevel0Files() {
+    return state.level0Files;
+  }
+
+  private void debugDumpState(String string) {
+    if (!LOG.isDebugEnabled()) return;
+    StringBuilder sb = new StringBuilder();
+    sb.append("\n" + string + "; current stripe state is as such:");
+    sb.append("\n level 0 with ").append(state.level0Files.size()).append(" files;");
+    for (int i = 0; i < state.stripeFiles.size(); ++i) {
+      String endRow = (i == state.stripeEndRows.length)
+          ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
+      sb.append("\n stripe ending in ").append(endRow).append(" with ")
+        .append(state.stripeFiles.get(i).size()).append(" files;");
+    }
+    sb.append("\n").append(getStorefileCount()).append(" files total.");
+    LOG.debug(sb.toString());
+  }
+
+  /**
+   * Checks whether the key indicates an open interval boundary (i.e. infinity).
+   */
+  private static final boolean isOpen(byte[] key) {
+    return key != null && key.length == 0;
+  }
+
+  /**
+   * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
+   */
+  private static final boolean isInvalid(byte[] key) {
+    return key == INVALID_KEY;
+  }
+
+  /**
+   * Compare two keys for equality.
+   */
+  private final boolean rowEquals(byte[] k1, byte[] k2) {
+    return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
+  }
+
+  /**
+   * Compare two keys. Keys must not be open (isOpen(row) == false).
+   */
+  private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
+    assert !isOpen(k1) && !isOpen(k2);
+    return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
+  }
+
+  /**
+   * Finds the stripe index by end key.
+   */
+  private final int findStripeIndexByEndRow(byte[] endRow) {
+    assert !isInvalid(endRow);
+    if (isOpen(endRow)) return state.stripeEndRows.length;
+    return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
+  }
+
+  /**
+   * Finds the stripe index for the stripe containing a key provided externally for get/scan.
+   */
+  private final int findStripeForRow(byte[] row, boolean isStart) {
+    if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
+    if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
+    // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
+    // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
+    // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
+    // insertion point is the index of the next greater element, or list size if none. The
+    // insertion point happens to be exactly what we need, so we need to add one to the result.
+    return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
+  }
+
+  /**
+   * Gets the start key for a given stripe.
+   * @param stripeIndex Stripe index.
+   * @return Start key. May be an open key.
+   */
+  public final byte[] getStartRow(int stripeIndex) {
+    return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
+  }
+
+  /**
+   * Gets the start key for a given stripe.
+   * @param stripeIndex Stripe index.
+   * @return Start key. May be an open key.
+   */
+  public final byte[] getEndRow(int stripeIndex) {
+    return (stripeIndex == state.stripeEndRows.length
+        ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
+  }
+
+  private byte[] startOf(StoreFile sf) {
+    byte[] result = this.fileStarts.get(sf);
+    return result != null ? result : sf.getMetadataValue(STRIPE_START_KEY);
+  }
+
+  private byte[] endOf(StoreFile sf) {
+    byte[] result = this.fileEnds.get(sf);
+    return result != null ? result : sf.getMetadataValue(STRIPE_END_KEY);
+  }
+
+  /**
+   * Inserts a file in the correct place (by seqnum) in a stripe copy.
+   * @param stripe Stripe copy to insert into.
+   * @param sf File to insert.
+   */
+  private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
+    // The only operation for which sorting of the files matters is KeyBefore. Therefore,
+    // we will store the file in reverse order by seqNum from the outset.
+    for (int insertBefore = 0; ; ++insertBefore) {
+      if (insertBefore == stripe.size()
+          || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
+        stripe.add(insertBefore, sf);
+        break;
+      }
+    }
+  }
+
+  /**
+   * An extension of ConcatenatedLists that has several peculiar properties.
+   * First, one can cut the tail of the logical list by removing last several sub-lists.
+   * Second, items can be removed thru iterator.
+   * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
+   * On average KeyBefore operation will contain half the stripes as potential candidates,
+   * but will quickly cut down on them as it finds something in the more likely ones; thus,
+   * the above allow us to avoid unnecessary copying of a bunch of lists.
+   */
+  private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
+    @Override
+    public java.util.Iterator<StoreFile> iterator() {
+      return new Iterator();
+    }
+
+    public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
+      public ArrayList<List<StoreFile>> getComponents() {
+        return components;
+      }
+
+      public void removeComponents(int startIndex) {
+        List<List<StoreFile>> subList = components.subList(startIndex, components.size());
+        for (List<StoreFile> entry : subList) {
+          size -= entry.size();
+        }
+        assert size >= 0;
+        subList.clear();
+      }
+
+      @Override
+      public void remove() {
+        if (!this.nextWasCalled) {
+          throw new IllegalStateException("No element to remove");
+        }
+        this.nextWasCalled = false;
+        List<StoreFile> src = components.get(currentComponent);
+        if (src instanceof ImmutableList<?>) {
+          src = new ArrayList<StoreFile>(src);
+          components.set(currentComponent, src);
+        }
+        src.remove(indexWithinComponent);
+        --size;
+        --indexWithinComponent;
+        if (src.isEmpty()) {
+          components.remove(currentComponent); // indexWithinComponent is already -1 here.
+        }
+      }
+    }
+  }
+
+  /**
+   * Non-static helper class for merging compaction results.
+   * Since we want to merge them atomically (more or less), it operates on lazy copies, and
+   * then applies copies to real lists as necessary.
+   */
+  private class CompactionResultsMergeCopy {
+    private ArrayList<List<StoreFile>> stripeFiles = null;
+    private ArrayList<StoreFile> level0Files = null;
+    private ArrayList<byte[]> stripeEndRows = null;
+
+    private Collection<StoreFile> compactedFiles = null;
+    private Collection<StoreFile> results = null;
+
+    private List<StoreFile> l0Results = new ArrayList<StoreFile>();
+
+    public CompactionResultsMergeCopy() {
+      // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
+      this.stripeFiles = new ArrayList<List<StoreFile>>(
+          StripeStoreFileManager.this.state.stripeFiles);
+    }
+
+    public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
+        throws IOException {
+      assert this.compactedFiles == null && this.results == null;
+      this.compactedFiles = compactedFiles;
+      this.results = results;
+      // Do logical processing.
+      removeCompactedFiles();
+      TreeMap<byte[], StoreFile> newStripes = processCompactionResults();
+      if (newStripes != null) {
+        processNewCandidateStripes(newStripes);
+      }
+      // Create new state and update parent.
+      State state = createNewState();
+      StripeStoreFileManager.this.state = state;
+      updateMetadataMaps();
+    }
+
+    private State createNewState() {
+      State oldState = StripeStoreFileManager.this.state;
+      // Stripe count should be the same unless the end rows changed.
+      assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
+      State newState = new State();
+      newState.level0Files = (this.level0Files == null) ? oldState.level0Files
+          : ImmutableList.copyOf(this.level0Files);
+      newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
+          : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
+      newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
+      for (List<StoreFile> newStripe : this.stripeFiles) {
+        newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
+            ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
+      }
+
+      List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
+      newAllFiles.removeAll(compactedFiles);
+      newAllFiles.addAll(results);
+      newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
+      return newState;
+    }
+
+    private void updateMetadataMaps() {
+      StripeStoreFileManager parent = StripeStoreFileManager.this;
+      for (StoreFile sf : this.compactedFiles) {
+        parent.fileStarts.remove(sf);
+        parent.fileEnds.remove(sf);
+      }
+      for (StoreFile sf : this.l0Results) {
+        parent.ensureLevel0Metadata(sf);
+      }
+    }
+
+    /**
+     * @param index Index of the stripe we need.
+     * @return A lazy stripe copy from current stripes.
+     */
+    private final ArrayList<StoreFile> getStripeCopy(int index) {
+      List<StoreFile> stripeCopy = this.stripeFiles.get(index);
+      ArrayList<StoreFile> result = null;
+      if (stripeCopy instanceof ImmutableList<?>) {
+        result = new ArrayList<StoreFile>(stripeCopy);
+        this.stripeFiles.set(index, result);
+      } else {
+        result = (ArrayList<StoreFile>)stripeCopy;
+      }
+      return result;
+    }
+
+    /**
+     * @return A lazy L0 copy from current state.
+     */
+    private final ArrayList<StoreFile> getLevel0Copy() {
+      if (this.level0Files == null) {
+        this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
+      }
+      return this.level0Files;
+    }
+
+    /**
+     * Process new files, and add them either to the structure of existing stripes,
+     * or to the list of new candidate stripes.
+     * @return New candidate stripes.
+     */
+    private TreeMap<byte[], StoreFile> processCompactionResults() throws IOException {
+      TreeMap<byte[], StoreFile> newStripes = null;
+      for (StoreFile sf : this.results) {
+        byte[] startRow = startOf(sf), endRow = endOf(sf);
+        if (isInvalid(endRow) || isInvalid(startRow)) {
+          LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath());
+          insertFileIntoStripe(getLevel0Copy(), sf);
+          this.l0Results.add(sf);
+          continue;
+        }
+        if (!this.stripeFiles.isEmpty()) {
+          int stripeIndex = findStripeIndexByEndRow(endRow);
+          if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
+            // Simple/common case - add file to an existing stripe.
+            insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
+            continue;
+          }
+        }
+
+        // Make a new candidate stripe.
+        if (newStripes == null) {
+          newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
+        }
+        StoreFile oldSf = newStripes.put(endRow, sf);
+        if (oldSf != null) {
+          throw new IOException("Compactor has produced multiple files for the stripe ending in ["
+              + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
+        }
+      }
+      return newStripes;
+    }
+
+    /**
+     * Remove compacted files.
+     * @param compactedFiles Compacted files.
+     */
+    private void removeCompactedFiles() throws IOException {
+      for (StoreFile oldFile : this.compactedFiles) {
+        byte[] oldEndRow = endOf(oldFile);
+        List<StoreFile> source = null;
+        if (isInvalid(oldEndRow)) {
+          source = getLevel0Copy();
+        } else {
+          int stripeIndex = findStripeIndexByEndRow(oldEndRow);
+          if (stripeIndex < 0) {
+            throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
+                + " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])");
+          }
+          source = getStripeCopy(stripeIndex);
+        }
+        if (!source.remove(oldFile)) {
+          throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
+        }
+      }
+    }
+
+    /**
+     * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
+     * new candidate stripes/removes old stripes; produces new set of stripe end keys.
+     * @param newStripes  New stripes - files by end key.
+     */
+    private void processNewCandidateStripes(
+        TreeMap<byte[], StoreFile> newStripes) throws IOException {
+      // Validate that the removed and added aggregate ranges still make for a full key space.
+      boolean hasStripes = !this.stripeFiles.isEmpty();
+      this.stripeEndRows = new ArrayList<byte[]>(
+          Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
+      int removeFrom = 0;
+      byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
+      byte[] lastEndRow = newStripes.lastKey();
+      if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
+        throw new IOException("Newly created stripes do not cover the entire key space.");
+      }
+
+      if (hasStripes) {
+        // Determine which stripes will need to be removed because they conflict with new stripes.
+        // The new boundaries should match old stripe boundaries, so we should get exact matches.
+        if (isOpen(firstStartRow)) {
+          removeFrom = 0;
+        } else {
+          removeFrom = findStripeIndexByEndRow(firstStartRow);
+          if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
+          ++removeFrom;
+        }
+        int removeTo = findStripeIndexByEndRow(lastEndRow);
+        if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
+        // Remove old empty stripes.
+        int originalCount = this.stripeFiles.size();
+        for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
+          if (!this.stripeFiles.get(removeIndex).isEmpty()) {
+            throw new IOException("Compaction intends to create a new stripe that replaces an"
+                + " existing one, but the latter contains some files.");
+          }
+          if (removeIndex != originalCount - 1) {
+            this.stripeEndRows.remove(removeIndex);
+          }
+          this.stripeFiles.remove(removeIndex);
+        }
+      }
+
+      // Now, insert new stripes. The total ranges match, so we can insert where we removed.
+      byte[] previousEndRow = null;
+      int insertAt = removeFrom;
+      for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
+        if (previousEndRow != null) {
+          // Validate that the ranges are contiguous.
+          assert !isOpen(previousEndRow);
+          byte[] startRow = startOf(newStripe.getValue());
+          if (!rowEquals(previousEndRow, startRow)) {
+            throw new IOException("The new stripes produced by compaction are not contiguous");
+          }
+        }
+        // Add the new stripe.
+        ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
+        tmp.add(newStripe.getValue());
+        stripeFiles.add(insertAt, tmp);
+        previousEndRow = newStripe.getKey();
+        if (!isOpen(previousEndRow)) {
+          stripeEndRows.add(insertAt, previousEndRow);
+        }
+        ++insertAt;
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java?rev=1536567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java Tue Oct 29 00:34:12 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** A mock used so our tests don't deal with actual StoreFiles */
+public class MockStoreFile extends StoreFile {
+  long length = 0;
+  boolean isRef = false;
+  long ageInDisk;
+  long sequenceid;
+  private Map<byte[], byte[]> metadata = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+  byte[] splitPoint = null;
+
+  MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
+      long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
+    super(testUtil.getTestFileSystem(), testPath, testUtil.getConfiguration(),
+          new CacheConfig(testUtil.getConfiguration()), BloomType.NONE,
+          NoOpDataBlockEncoder.INSTANCE);
+    this.length = length;
+    this.isRef = isRef;
+    this.ageInDisk = ageInDisk;
+    this.sequenceid = sequenceid;
+  }
+
+  void setLength(long newLen) {
+    this.length = newLen;
+  }
+
+  @Override
+  byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
+    return this.splitPoint;
+  }
+
+  @Override
+  public long getMaxSequenceId() {
+    return sequenceid;
+  }
+
+  @Override
+  public boolean isMajorCompaction() {
+    return false;
+  }
+
+  @Override
+  public boolean isReference() {
+    return this.isRef;
+  }
+
+  @Override
+  boolean isBulkLoadResult() {
+    return false;
+  }
+
+  @Override
+  public byte[] getMetadataValue(byte[] key) {
+    return this.metadata.get(key);
+  }
+
+  public void setMetadataValue(byte[] key, byte[] value) {
+    this.metadata.put(key, value);
+  }
+
+  @Override
+  public StoreFile.Reader getReader() {
+    final long len = this.length;
+    return new StoreFile.Reader() {
+      @Override
+      public long length() {
+        return len;
+      }
+    };
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java?rev=1536567&r1=1536566&r2=1536567&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java Tue Oct 29 00:34:12 2013
@@ -125,64 +125,6 @@ public class TestDefaultCompactSelection
     }
   }
 
-  // used so our tests don't deal with actual StoreFiles
-  static class MockStoreFile extends StoreFile {
-    long length = 0;
-    boolean isRef = false;
-    long ageInDisk;
-    long sequenceid;
-
-    MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
-      super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
-            new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
-            NoOpDataBlockEncoder.INSTANCE);
-      this.length = length;
-      this.isRef = isRef;
-      this.ageInDisk = ageInDisk;
-      this.sequenceid = sequenceid;
-    }
-
-    void setLength(long newLen) {
-      this.length = newLen;
-    }
-
-    @Override
-    public long getMaxSequenceId() {
-      return sequenceid;
-    }
-
-    @Override
-    public boolean isMajorCompaction() {
-      return false;
-    }
-
-    @Override
-    public boolean isReference() {
-      return this.isRef;
-    }
-
-    @Override
-    public StoreFile.Reader getReader() {
-      final long len = this.length;
-      return new StoreFile.Reader() {
-        @Override
-        public long length() {
-          return len;
-        }
-      };
-    }
-
-    @Override
-    public String toString() {
-      return "MockStoreFile{" +
-          "length=" + length +
-          ", isRef=" + isRef +
-          ", ageInDisk=" + ageInDisk +
-          ", sequenceid=" + sequenceid +
-          '}';
-    }
-  }
-
   ArrayList<Long> toArrayList(long... numbers) {
     ArrayList<Long> result = new ArrayList<Long>();
     for (long i : numbers) {
@@ -216,7 +158,8 @@ public class TestDefaultCompactSelection
       throws IOException {
     List<StoreFile> ret = Lists.newArrayList();
     for (int i = 0; i < sizes.size(); i++) {
-      ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
+      ret.add(new MockStoreFile(TEST_UTIL, TEST_FILE,
+          sizes.get(i), ageInDisk.get(i), isReference, i));
     }
     return ret;
   }