You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/02/11 19:39:11 UTC

[2/2] hbase git commit: HBASE-14919 Refactoring for in-memory flush and compaction

HBASE-14919 Refactoring for in-memory flush and compaction

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25dfc112
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25dfc112
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25dfc112

Branch: refs/heads/master
Commit: 25dfc112dd76134a9a3ce1f2e88c4075aef76557
Parents: a975408
Author: eshcar <es...@yahoo-inc.com>
Authored: Mon Feb 8 23:35:02 2016 +0200
Committer: stack <st...@apache.org>
Committed: Thu Feb 11 10:39:01 2016 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    | 497 +++++++++++
 .../hadoop/hbase/regionserver/CellSet.java      | 183 ++++
 .../hbase/regionserver/CellSkipListSet.java     | 185 ----
 .../hbase/regionserver/DefaultMemStore.java     | 859 +------------------
 .../hadoop/hbase/regionserver/HStore.java       |  22 +-
 .../hbase/regionserver/ImmutableSegment.java    |  72 ++
 .../regionserver/ImmutableSegmentAdapter.java   | 107 +++
 .../hadoop/hbase/regionserver/MemStore.java     |  16 +-
 .../hbase/regionserver/MemStoreScanner.java     | 348 ++++++++
 .../hbase/regionserver/MemStoreSnapshot.java    |  13 +-
 .../regionserver/MutableCellSetSegment.java     | 153 ++++
 .../MutableCellSetSegmentScanner.java           | 258 ++++++
 .../hbase/regionserver/MutableSegment.java      |  57 ++
 .../hadoop/hbase/regionserver/Segment.java      | 218 +++++
 .../hbase/regionserver/SegmentFactory.java      |  89 ++
 .../hbase/regionserver/SegmentScanner.java      | 152 ++++
 .../hbase/regionserver/StoreFlushContext.java   |   2 +-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  49 +-
 .../hbase/regionserver/TestCellSkipListSet.java |  13 +-
 .../hbase/regionserver/TestDefaultMemStore.java | 133 +--
 .../hbase/regionserver/TestHMobStore.java       |  29 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 150 ++--
 .../regionserver/TestMemStoreChunkPool.java     |  29 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  11 +-
 24 files changed, 2380 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
new file mode 100644
index 0000000..18d2f8a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -0,0 +1,497 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * An abstract class, which implements the behaviour shared by all concrete memstore instances.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMemStore implements MemStore {
+
+  private static final long NO_SNAPSHOT_ID = -1;
+
+  private final Configuration conf;
+  private final CellComparator comparator;
+
+  // active segment absorbs write operations
+  private volatile MutableSegment active;
+  // Snapshot of memstore.  Made for flusher.
+  private volatile ImmutableSegment snapshot;
+  protected volatile long snapshotId;
+  // Used to track when to flush
+  private volatile long timeOfOldestEdit;
+
+  public final static long FIXED_OVERHEAD = ClassSize.align(
+      ClassSize.OBJECT +
+          (4 * ClassSize.REFERENCE) +
+          (2 * Bytes.SIZEOF_LONG));
+
+  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+      2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
+      ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
+
+
+  protected AbstractMemStore(final Configuration conf, final CellComparator c) {
+    this.conf = conf;
+    this.comparator = c;
+    resetCellSet();
+    this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0);
+    this.snapshotId = NO_SNAPSHOT_ID;
+  }
+
+  protected void resetCellSet() {
+    // Reset heap to not include any keys
+    this.active = SegmentFactory.instance().createMutableSegment(
+        conf, comparator, DEEP_OVERHEAD);
+    this.timeOfOldestEdit = Long.MAX_VALUE;
+  }
+
+  /*
+  * Calculate how the MemStore size has changed.  Includes overhead of the
+  * backing Map.
+  * @param cell
+  * @param notPresent True if the cell was NOT present in the set.
+  * @return change in size
+  */
+  static long heapSizeChange(final Cell cell, final boolean notPresent) {
+    return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
+        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+  }
+
+  /**
+   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
+   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
+   *                      only if it is greater than the previous sequence id
+   */
+  public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent);
+
+  /**
+   * Write an update
+   * @param cell the cell to be added
+   * @return approximate size of the passed cell & newly added cell which maybe different than the
+   *         passed-in cell
+   */
+  @Override
+  public long add(Cell cell) {
+    Cell toAdd = maybeCloneWithAllocator(cell);
+    return internalAdd(toAdd);
+  }
+
+  /**
+   * Update or insert the specified Cells.
+   * <p>
+   * For each Cell, insert into MemStore.  This will atomically upsert the
+   * value for that row/family/qualifier.  If a Cell did already exist,
+   * it will then be removed.
+   * <p>
+   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+   * be immediately visible.  May want to change this so it is atomic across
+   * all Cells.
+   * <p>
+   * This is called under row lock, so Get operations will still see updates
+   * atomically.  Scans will only see each Cell update as atomic.
+   *
+   * @param cells the cells to be updated
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @return change in memstore size
+   */
+  @Override
+  public long upsert(Iterable<Cell> cells, long readpoint) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += upsert(cell, readpoint);
+    }
+    return size;
+  }
+
+  /**
+   * @return Oldest timestamp of all the Cells in the MemStore
+   */
+  @Override
+  public long timeOfOldestEdit() {
+    return timeOfOldestEdit;
+  }
+
+
+  /**
+   * Write a delete
+   * @param deleteCell the cell to be deleted
+   * @return approximate size of the passed key and value.
+   */
+  @Override
+  public long delete(Cell deleteCell) {
+    Cell toAdd = maybeCloneWithAllocator(deleteCell);
+    long s = internalAdd(toAdd);
+    return s;
+  }
+
+  /**
+   * An override on snapshot so the no arg version of the method implies zero seq num,
+   * like for cases without wal
+   */
+  public MemStoreSnapshot snapshot() {
+    return snapshot(0);
+  }
+
+  /**
+   * The passed snapshot was successfully persisted; it can be let go.
+   * @param id Id of the snapshot to clean out.
+   * @see MemStore#snapshot(long)
+   */
+  @Override
+  public void clearSnapshot(long id) throws UnexpectedStateException {
+    if (this.snapshotId != id) {
+      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+          + id);
+    }
+    // OK. Passed in snapshot is same as current snapshot. If not-empty,
+    // create a new snapshot and let the old one go.
+    Segment oldSnapshot = this.snapshot;
+    if (!this.snapshot.isEmpty()) {
+      this.snapshot = SegmentFactory.instance().createImmutableSegment(
+          getComparator(), 0);
+    }
+    this.snapshotId = NO_SNAPSHOT_ID;
+    oldSnapshot.close();
+  }
+
+  /**
+   * Get the entire heap usage for this MemStore not including keys in the
+   * snapshot.
+   */
+  @Override
+  public long heapSize() {
+    return getActive().getSize();
+  }
+
+  /**
+   * On flush, how much memory we will clear from the active cell set.
+   *
+   * @return size of data that is going to be flushed from active set
+   */
+  @Override
+  public long getFlushableSize() {
+    long snapshotSize = getSnapshot().getSize();
+    return snapshotSize > 0 ? snapshotSize : keySize();
+  }
+
+
+  /**
+   * @return a list containing a single memstore scanner.
+   */
+  @Override
+  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
+    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt));
+  }
+
+  @Override
+  public long getSnapshotSize() {
+    return getSnapshot().getSize();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    int i = 1;
+    try {
+      for (Segment segment : getListOfSegments()) {
+        buf.append("Segment (" + i + ") " + segment.toString() + "; ");
+        i++;
+      }
+    } catch (IOException e){
+      return e.toString();
+    }
+    return buf.toString();
+  }
+
+  protected void rollbackInSnapshot(Cell cell) {
+    // If the key is in the snapshot, delete it. We should not update
+    // this.size, because that tracks the size of only the memstore and
+    // not the snapshot. The flush of this snapshot to disk has not
+    // yet started because Store.flush() waits for all rwcc transactions to
+    // commit before starting the flush to disk.
+    snapshot.rollback(cell);
+  }
+
+  protected void rollbackInActive(Cell cell) {
+    // If the key is in the memstore, delete it. Update this.size.
+    long sz = active.rollback(cell);
+    if (sz != 0) {
+      setOldestEditTimeToNow();
+    }
+  }
+
+  protected Configuration getConfiguration() {
+    return conf;
+  }
+
+  protected void dump(Log log) {
+    active.dump(log);
+    snapshot.dump(log);
+  }
+
+
+  /**
+   * Inserts the specified Cell into MemStore and deletes any existing
+   * versions of the same row/family/qualifier as the specified Cell.
+   * <p>
+   * First, the specified Cell is inserted into the Memstore.
+   * <p>
+   * If there are any existing Cell in this MemStore with the same row,
+   * family, and qualifier, they are removed.
+   * <p>
+   * Callers must hold the read lock.
+   *
+   * @param cell the cell to be updated
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @return change in size of MemStore
+   */
+  private long upsert(Cell cell, long readpoint) {
+    // Add the Cell to the MemStore
+    // Use the internalAdd method here since we (a) already have a lock
+    // and (b) cannot safely use the MSLAB here without potentially
+    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+    // test that triggers the pathological case if we don't avoid MSLAB
+    // here.
+    long addedSize = internalAdd(cell);
+
+    // Get the Cells for the row/family/qualifier regardless of timestamp.
+    // For this case we want to clean up any other puts
+    Cell firstCell = KeyValueUtil.createFirstOnRow(
+        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+    SortedSet<Cell> ss = active.tailSet(firstCell);
+    Iterator<Cell> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
+    while (it.hasNext()) {
+      Cell cur = it.next();
+
+      if (cell == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // check that this is the row and column we are interested in, otherwise bail
+      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+            cur.getSequenceId() <= readpoint) {
+          if (versionsVisible >= 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            long delta = heapSizeChange(cur, true);
+            addedSize -= delta;
+            active.incSize(-delta);
+            it.remove();
+            setOldestEditTimeToNow();
+          } else {
+            versionsVisible++;
+          }
+        }
+      } else {
+        // past the row or column, done
+        break;
+      }
+    }
+    return addedSize;
+  }
+
+  /*
+   * @param a
+   * @param b
+   * @return Return lowest of a or b or null if both a and b are null
+   */
+  protected Cell getLowest(final Cell a, final Cell b) {
+    if (a == null) {
+      return b;
+    }
+    if (b == null) {
+      return a;
+    }
+    return comparator.compareRows(a, b) <= 0? a: b;
+  }
+
+  /*
+   * @param key Find row that follows this one.  If null, return first.
+   * @param set Set to look in for a row beyond <code>row</code>.
+   * @return Next row or null if none found.  If one found, will be a new
+   * KeyValue -- can be destroyed by subsequent calls to this method.
+   */
+  protected Cell getNextRow(final Cell key,
+      final NavigableSet<Cell> set) {
+    Cell result = null;
+    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
+    // Iterate until we fall into the next row; i.e. move off current row
+    for (Cell cell: tail) {
+      if (comparator.compareRows(cell, key) <= 0) {
+        continue;
+      }
+      // Note: Not suppressing deletes or expired cells.  Needs to be handled
+      // by higher up functions.
+      result = cell;
+      break;
+    }
+    return result;
+  }
+
+  /**
+   * Given the specs of a column, update it, first by inserting a new record,
+   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
+   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+   * get the new value, or the old value and all readers will eventually only see the new
+   * value after the old was removed.
+   */
+  @VisibleForTesting
+  @Override
+  public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long newValue, long now) {
+    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
+    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+    Cell snc = snapshot.getFirstAfter(firstCell);
+    if(snc != null) {
+      // is there a matching Cell in the snapshot?
+      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
+        if (snc.getTimestamp() == now) {
+          now += 1;
+        }
+      }
+    }
+    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+    // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+    // so we cant add the new Cell w/o knowing what's there already, but we also
+    // want to take this chance to delete some cells. So two loops (sad)
+
+    SortedSet<Cell> ss = getActive().tailSet(firstCell);
+    for (Cell cell : ss) {
+      // if this isnt the row we are interested in, then bail:
+      if (!CellUtil.matchingColumn(cell, family, qualifier)
+          || !CellUtil.matchingRow(cell, firstCell)) {
+        break; // rows dont match, bail.
+      }
+
+      // if the qualifier matches and it's a put, just RM it out of the active.
+      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
+          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
+        now = cell.getTimestamp();
+      }
+    }
+
+    // create or update (upsert) a new Cell with
+    // 'now' and a 0 memstoreTS == immediately visible
+    List<Cell> cells = new ArrayList<Cell>(1);
+    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+    return upsert(cells, 1L);
+  }
+
+  private Cell maybeCloneWithAllocator(Cell cell) {
+    return active.maybeCloneWithAllocator(cell);
+  }
+
+  /**
+   * Internal version of add() that doesn't clone Cells with the
+   * allocator, and doesn't take the lock.
+   *
+   * Callers should ensure they already have the read lock taken
+   */
+  private long internalAdd(final Cell toAdd) {
+    long s = active.add(toAdd);
+    setOldestEditTimeToNow();
+    checkActiveSize();
+    return s;
+  }
+
+  private void setOldestEditTimeToNow() {
+    if (timeOfOldestEdit == Long.MAX_VALUE) {
+      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
+    }
+  }
+
+  protected long keySize() {
+    return heapSize() - DEEP_OVERHEAD;
+  }
+
+  protected CellComparator getComparator() {
+    return comparator;
+  }
+
+  protected MutableSegment getActive() {
+    return active;
+  }
+
+  protected ImmutableSegment getSnapshot() {
+    return snapshot;
+  }
+
+  protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
+    this.snapshot = snapshot;
+    return this;
+  }
+
+  protected void setSnapshotSize(long snapshotSize) {
+    getSnapshot().setSize(snapshotSize);
+  }
+
+  /**
+   * Check whether anything need to be done based on the current active set size
+   */
+  protected abstract void checkActiveSize();
+
+  /**
+   * Returns a list of Store segment scanners, one per each store segment
+   * @param readPt the version number required to initialize the scanners
+   * @return a list of Store segment scanners, one per each store segment
+   */
+  protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException;
+
+  /**
+   * Returns an ordered list of segments from most recent to oldest in memstore
+   * @return an ordered list of segments from most recent to oldest in memstore
+   */
+  protected abstract List<Segment> getListOfSegments() throws IOException;
+
+  public long getActiveSize() {
+    return getActive().getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
new file mode 100644
index 0000000..4433302
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already
+ * exists in the set.  The call to add returns true if no value in the backing map or false if
+ * there was an entry with same key (though value may be different).
+ * implementation is tolerant of concurrent get and set and won't throw
+ * ConcurrentModificationException when iterating.
+ */
+@InterfaceAudience.Private
+public class CellSet implements NavigableSet<Cell>  {
+  // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
+  // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
+  // is not already present.", this implementation "Adds the specified element to this set EVEN
+  // if it is already present overwriting what was there previous".
+  // Otherwise, has same attributes as ConcurrentSkipListSet
+  private final ConcurrentNavigableMap<Cell, Cell> delegatee;
+
+  CellSet(final CellComparator c) {
+    this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
+  }
+
+  CellSet(final ConcurrentNavigableMap<Cell, Cell> m) {
+    this.delegatee = m;
+  }
+
+  public Cell ceiling(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Iterator<Cell> descendingIterator() {
+    return this.delegatee.descendingMap().values().iterator();
+  }
+
+  public NavigableSet<Cell> descendingSet() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell floor(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> headSet(final Cell toElement) {
+    return headSet(toElement, false);
+  }
+
+  public NavigableSet<Cell> headSet(final Cell toElement,
+      boolean inclusive) {
+    return new CellSet(this.delegatee.headMap(toElement, inclusive));
+  }
+
+  public Cell higher(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Iterator<Cell> iterator() {
+    return this.delegatee.values().iterator();
+  }
+
+  public Cell lower(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell pollFirst() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell pollLast() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public NavigableSet<Cell> subSet(Cell fromElement,
+      boolean fromInclusive, Cell toElement, boolean toInclusive) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> tailSet(Cell fromElement) {
+    return tailSet(fromElement, true);
+  }
+
+  public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
+    return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
+  }
+
+  public Comparator<? super Cell> comparator() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell first() {
+    return this.delegatee.get(this.delegatee.firstKey());
+  }
+
+  public Cell last() {
+    return this.delegatee.get(this.delegatee.lastKey());
+  }
+
+  public boolean add(Cell e) {
+    return this.delegatee.put(e, e) == null;
+  }
+
+  public boolean addAll(Collection<? extends Cell> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public void clear() {
+    this.delegatee.clear();
+  }
+
+  public boolean contains(Object o) {
+    //noinspection SuspiciousMethodCalls
+    return this.delegatee.containsKey(o);
+  }
+
+  public boolean containsAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public boolean isEmpty() {
+    return this.delegatee.isEmpty();
+  }
+
+  public boolean remove(Object o) {
+    return this.delegatee.remove(o) != null;
+  }
+
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell get(Cell kv) {
+    return this.delegatee.get(kv);
+  }
+
+  public int size() {
+    return this.delegatee.size();
+  }
+
+  public Object[] toArray() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public <T> T[] toArray(T[] a) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
deleted file mode 100644
index e9941b3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A {@link java.util.Set} of {@link Cell}s implemented on top of a
- * {@link java.util.concurrent.ConcurrentSkipListMap}.  Works like a
- * {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard:
- * An add will overwrite if already an entry for the added key.  In other words,
- * where CSLS does "Adds the specified element to this set if it is not already
- * present.", this implementation "Adds the specified element to this set EVEN
- * if it is already present overwriting what was there previous".  The call to
- * add returns true if no value in the backing map or false if there was an
- * entry with same key (though value may be different).
- * <p>Otherwise,
- * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
- * get and set and won't throw ConcurrentModificationException when iterating.
- */
-@InterfaceAudience.Private
-public class CellSkipListSet implements NavigableSet<Cell> {
-  private final ConcurrentNavigableMap<Cell, Cell> delegatee;
-
-  CellSkipListSet(final CellComparator c) {
-    this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
-  }
-
-  CellSkipListSet(final ConcurrentNavigableMap<Cell, Cell> m) {
-    this.delegatee = m;
-  }
-
-  public Cell ceiling(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Iterator<Cell> descendingIterator() {
-    return this.delegatee.descendingMap().values().iterator();
-  }
-
-  public NavigableSet<Cell> descendingSet() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell floor(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> headSet(final Cell toElement) {
-    return headSet(toElement, false);
-  }
-
-  public NavigableSet<Cell> headSet(final Cell toElement,
-      boolean inclusive) {
-    return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive));
-  }
-
-  public Cell higher(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Iterator<Cell> iterator() {
-    return this.delegatee.values().iterator();
-  }
-
-  public Cell lower(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell pollFirst() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell pollLast() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public NavigableSet<Cell> subSet(Cell fromElement,
-      boolean fromInclusive, Cell toElement, boolean toInclusive) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> tailSet(Cell fromElement) {
-    return tailSet(fromElement, true);
-  }
-
-  public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
-    return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive));
-  }
-
-  public Comparator<? super Cell> comparator() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell first() {
-    return this.delegatee.get(this.delegatee.firstKey());
-  }
-
-  public Cell last() {
-    return this.delegatee.get(this.delegatee.lastKey());
-  }
-
-  public boolean add(Cell e) {
-    return this.delegatee.put(e, e) == null;
-  }
-
-  public boolean addAll(Collection<? extends Cell> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public void clear() {
-    this.delegatee.clear();
-  }
-
-  public boolean contains(Object o) {
-    //noinspection SuspiciousMethodCalls
-    return this.delegatee.containsKey(o);
-  }
-
-  public boolean containsAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public boolean isEmpty() {
-    return this.delegatee.isEmpty();
-  }
-
-  public boolean remove(Object o) {
-    return this.delegatee.remove(o) != null;
-  }
-
-  public boolean removeAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public boolean retainAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell get(Cell kv) {
-    return this.delegatee.get(kv);
-  }
-
-  public int size() {
-    return this.delegatee.size();
-  }
-
-  public Object[] toArray() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public <T> T[] toArray(T[] a) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index f61d871..82d40b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -19,35 +19,22 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.htrace.Trace;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -66,40 +53,8 @@ import org.apache.htrace.Trace;
  * in KV size.
  */
 @InterfaceAudience.Private
-public class DefaultMemStore implements MemStore {
+public class DefaultMemStore extends AbstractMemStore {
   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
-  static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
-  private static final boolean USEMSLAB_DEFAULT = true;
-  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
-
-  private Configuration conf;
-
-  // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
-  // better semantics.  The Map will overwrite if passed a key it already had
-  // whereas the Set will not add new Cell if key is same though value might be
-  // different.  Value is not important -- just make sure always same
-  // reference passed.
-  volatile CellSkipListSet cellSet;
-
-  // Snapshot of memstore.  Made for flusher.
-  volatile CellSkipListSet snapshot;
-
-  final CellComparator comparator;
-
-  // Used to track own heapSize
-  final AtomicLong size;
-  private volatile long snapshotSize;
-
-  // Used to track when to flush
-  volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
-  TimeRangeTracker timeRangeTracker;
-  TimeRangeTracker snapshotTimeRangeTracker;
-
-  volatile MemStoreLAB allocator;
-  volatile MemStoreLAB snapshotAllocator;
-  volatile long snapshotId;
-  volatile boolean tagsPresent;
 
   /**
    * Default constructor. Used for tests.
@@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore {
    * Constructor.
    * @param c Comparator
    */
-  public DefaultMemStore(final Configuration conf,
-                  final CellComparator c) {
-    this.conf = conf;
-    this.comparator = c;
-    this.cellSet = new CellSkipListSet(c);
-    this.snapshot = new CellSkipListSet(c);
-    timeRangeTracker = new TimeRangeTracker();
-    snapshotTimeRangeTracker = new TimeRangeTracker();
-    this.size = new AtomicLong(DEEP_OVERHEAD);
-    this.snapshotSize = 0;
-    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-      this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-          new Class[] { Configuration.class }, new Object[] { conf });
-    } else {
-      this.allocator = null;
-    }
+  public DefaultMemStore(final Configuration conf, final CellComparator c) {
+    super(conf, c);
   }
 
   void dump() {
-    for (Cell cell: this.cellSet) {
-      LOG.info(cell);
-    }
-    for (Cell cell: this.snapshot) {
-      LOG.info(cell);
-    }
+    super.dump(LOG);
   }
 
   /**
    * Creates a snapshot of the current memstore.
    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
+   * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal
    */
   @Override
-  public MemStoreSnapshot snapshot() {
+  public MemStoreSnapshot snapshot(long flushOpSeqId) {
     // If snapshot currently has entries, then flusher failed or didn't call
     // cleanup.  Log a warning.
-    if (!this.snapshot.isEmpty()) {
+    if (!getSnapshot().isEmpty()) {
       LOG.warn("Snapshot called again without clearing previous. " +
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
-      this.snapshotSize = keySize();
-      if (!this.cellSet.isEmpty()) {
-        this.snapshot = this.cellSet;
-        this.cellSet = new CellSkipListSet(this.comparator);
-        this.snapshotTimeRangeTracker = this.timeRangeTracker;
-        this.timeRangeTracker = new TimeRangeTracker();
-        // Reset heap to not include any keys
-        this.size.set(DEEP_OVERHEAD);
-        this.snapshotAllocator = this.allocator;
-        // Reset allocator so we get a fresh buffer for the new memstore
-        if (allocator != null) {
-          String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-          this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-              new Class[] { Configuration.class }, new Object[] { conf });
-        }
-        timeOfOldestEdit = Long.MAX_VALUE;
+      if (!getActive().isEmpty()) {
+        ImmutableSegment immutableSegment = SegmentFactory.instance().
+            createImmutableSegment(getConfiguration(), getActive());
+        setSnapshot(immutableSegment);
+        setSnapshotSize(keySize());
+        resetCellSet();
       }
     }
-    MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
-        this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
-        this.tagsPresent);
-    this.tagsPresent = false;
-    return memStoreSnapshot;
-  }
-
-  /**
-   * The passed snapshot was successfully persisted; it can be let go.
-   * @param id Id of the snapshot to clean out.
-   * @throws UnexpectedStateException
-   * @see #snapshot()
-   */
-  @Override
-  public void clearSnapshot(long id) throws UnexpectedStateException {
-    MemStoreLAB tmpAllocator = null;
-    if (this.snapshotId != id) {
-      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
-          + id);
-    }
-    // OK. Passed in snapshot is same as current snapshot. If not-empty,
-    // create a new snapshot and let the old one go.
-    if (!this.snapshot.isEmpty()) {
-      this.snapshot = new CellSkipListSet(this.comparator);
-      this.snapshotTimeRangeTracker = new TimeRangeTracker();
-    }
-    this.snapshotSize = 0;
-    this.snapshotId = -1;
-    if (this.snapshotAllocator != null) {
-      tmpAllocator = this.snapshotAllocator;
-      this.snapshotAllocator = null;
-    }
-    if (tmpAllocator != null) {
-      tmpAllocator.close();
-    }
-  }
-
-  @Override
-  public long getFlushableSize() {
-    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
-  }
+    return new MemStoreSnapshot(this.snapshotId, getSnapshot());
 
-  @Override
-  public long getSnapshotSize() {
-    return this.snapshotSize;
   }
 
-  /**
-   * Write an update
-   * @param cell
-   * @return approximate size of the passed Cell.
-   */
   @Override
-  public long add(Cell cell) {
-    Cell toAdd = maybeCloneWithAllocator(cell);
-    return internalAdd(toAdd);
+  protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
+    List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
+    list.add(0, getActive().getSegmentScanner(readPt));
+    list.add(1, getSnapshot().getSegmentScanner(readPt));
+    return list;
   }
 
   @Override
-  public long timeOfOldestEdit() {
-    return timeOfOldestEdit;
-  }
-
-  private boolean addToCellSet(Cell e) {
-    boolean b = this.cellSet.add(e);
-    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
-    // When we use ACL CP or Visibility CP which deals with Tags during
-    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
-    // parse the byte[] to identify the tags length.
-    if(e.getTagsLength() > 0) {
-      tagsPresent = true;
-    }
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  private boolean removeFromCellSet(Cell e) {
-    boolean b = this.cellSet.remove(e);
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  void setOldestEditTimeToNow() {
-    if (timeOfOldestEdit == Long.MAX_VALUE) {
-      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
-    }
-  }
-
-  /**
-   * Internal version of add() that doesn't clone Cells with the
-   * allocator, and doesn't take the lock.
-   *
-   * Callers should ensure they already have the read lock taken
-   */
-  private long internalAdd(final Cell toAdd) {
-    long s = heapSizeChange(toAdd, addToCellSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
-  }
-
-  private Cell maybeCloneWithAllocator(Cell cell) {
-    if (allocator == null) {
-      return cell;
-    }
-
-    int len = KeyValueUtil.length(cell);
-    ByteRange alloc = allocator.allocateBytes(len);
-    if (alloc == null) {
-      // The allocation was too large, allocator decided
-      // not to do anything with it.
-      return cell;
-    }
-    assert alloc.getBytes() != null;
-    KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
-    KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
-    newKv.setSequenceId(cell.getSequenceId());
-    return newKv;
+  protected List<Segment> getListOfSegments() throws IOException {
+    List<Segment> list = new ArrayList<Segment>(2);
+    list.add(0, getActive());
+    list.add(1, getSnapshot());
+    return list;
   }
 
   /**
@@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public void rollback(Cell cell) {
-    // If the key is in the snapshot, delete it. We should not update
-    // this.size, because that tracks the size of only the memstore and
-    // not the snapshot. The flush of this snapshot to disk has not
-    // yet started because Store.flush() waits for all rwcc transactions to
-    // commit before starting the flush to disk.
-    Cell found = this.snapshot.get(cell);
-    if (found != null && found.getSequenceId() == cell.getSequenceId()) {
-      this.snapshot.remove(cell);
-      long sz = heapSizeChange(cell, true);
-      this.snapshotSize -= sz;
-    }
-    // If the key is in the memstore, delete it. Update this.size.
-    found = this.cellSet.get(cell);
-    if (found != null && found.getSequenceId() == cell.getSequenceId()) {
-      removeFromCellSet(cell);
-      long s = heapSizeChange(cell, true);
-      this.size.addAndGet(-s);
-    }
-  }
-
-  /**
-   * Write a delete
-   * @param deleteCell
-   * @return approximate size of the passed key and value.
-   */
-  @Override
-  public long delete(Cell deleteCell) {
-    long s = 0;
-    Cell toAdd = maybeCloneWithAllocator(deleteCell);
-    s += heapSizeChange(toAdd, addToCellSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
+    rollbackInSnapshot(cell);
+    rollbackInActive(cell);
   }
 
   /**
@@ -342,604 +137,29 @@ public class DefaultMemStore implements MemStore {
    * @return Next row or null if none found.
    */
   Cell getNextRow(final Cell cell) {
-    return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
-  }
-
-  /*
-   * @param a
-   * @param b
-   * @return Return lowest of a or b or null if both a and b are null
-   */
-  private Cell getLowest(final Cell a, final Cell b) {
-    if (a == null) {
-      return b;
-    }
-    if (b == null) {
-      return a;
-    }
-    return comparator.compareRows(a, b) <= 0? a: b;
+    return getLowest(
+        getNextRow(cell, getActive().getCellSet()),
+        getNextRow(cell, getSnapshot().getCellSet()));
   }
 
-  /*
-   * @param key Find row that follows this one.  If null, return first.
-   * @param map Set to look in for a row beyond <code>row</code>.
-   * @return Next row or null if none found.  If one found, will be a new
-   * KeyValue -- can be destroyed by subsequent calls to this method.
-   */
-  private Cell getNextRow(final Cell key,
-      final NavigableSet<Cell> set) {
-    Cell result = null;
-    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
-    // Iterate until we fall into the next row; i.e. move off current row
-    for (Cell cell: tail) {
-      if (comparator.compareRows(cell, key) <= 0)
-        continue;
-      // Note: Not suppressing deletes or expired cells.  Needs to be handled
-      // by higher up functions.
-      result = cell;
-      break;
-    }
-    return result;
+  @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) {
   }
 
   /**
-   * Only used by tests. TODO: Remove
-   *
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   *
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param newValue
-   * @param now
-   * @return  Timestamp
+   * @return Total memory occupied by this MemStore.
    */
   @Override
-  public long updateColumnValue(byte[] row,
-                                byte[] family,
-                                byte[] qualifier,
-                                long newValue,
-                                long now) {
-    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
-    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
-    if (!snSs.isEmpty()) {
-      Cell snc = snSs.first();
-      // is there a matching Cell in the snapshot?
-      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
-        if (snc.getTimestamp() == now) {
-          // poop,
-          now += 1;
-        }
-      }
-    }
-
-    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-    // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
-    // so we cant add the new Cell w/o knowing what's there already, but we also
-    // want to take this chance to delete some cells. So two loops (sad)
-
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
-    for (Cell cell : ss) {
-      // if this isnt the row we are interested in, then bail:
-      if (!CellUtil.matchingColumn(cell, family, qualifier)
-          || !CellUtil.matchingRow(cell, firstCell)) {
-        break; // rows dont match, bail.
-      }
-
-      // if the qualifier matches and it's a put, just RM it out of the cellSet.
-      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
-          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
-        now = cell.getTimestamp();
-      }
-    }
-
-    // create or update (upsert) a new Cell with
-    // 'now' and a 0 memstoreTS == immediately visible
-    List<Cell> cells = new ArrayList<Cell>(1);
-    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
-  }
-
-  /**
-   * Update or insert the specified KeyValues.
-   * <p>
-   * For each KeyValue, insert into MemStore.  This will atomically upsert the
-   * value for that row/family/qualifier.  If a KeyValue did already exist,
-   * it will then be removed.
-   * <p>
-   * This is called under row lock, so Get operations will still see updates
-   * atomically.  Scans will only see each KeyValue update as atomic.
-   *
-   * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in memstore size
-   */
-  @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) {
-    long size = 0;
-    for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
-    }
-    return size;
-  }
-
-  /**
-   * Inserts the specified KeyValue into MemStore and deletes any existing
-   * versions of the same row/family/qualifier as the specified KeyValue.
-   * <p>
-   * First, the specified KeyValue is inserted into the Memstore.
-   * <p>
-   * If there are any existing KeyValues in this MemStore with the same row,
-   * family, and qualifier, they are removed.
-   * <p>
-   * Callers must hold the read lock.
-   * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells.
-   * @return change in size of MemStore
-   */
-  private long upsert(Cell cell, long readpoint) {
-    // Add the Cell to the MemStore
-    // Use the internalAdd method here since we (a) already have a lock
-    // and (b) cannot safely use the MSLAB here without potentially
-    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
-    // test that triggers the pathological case if we don't avoid MSLAB
-    // here.
-    long addedSize = internalAdd(cell);
-
-    // Get the Cells for the row/family/qualifier regardless of timestamp.
-    // For this case we want to clean up any other puts
-    Cell firstCell = KeyValueUtil.createFirstOnRow(
-        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
-    Iterator<Cell> it = ss.iterator();
-    // Versions visible to oldest scanner.
-    int versionsVisible = 0;
-    while ( it.hasNext() ) {
-      Cell cur = it.next();
-
-      if (cell == cur) {
-        // ignore the one just put in
-        continue;
-      }
-      // check that this is the row and column we are interested in, otherwise bail
-      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
-        // only remove Puts that concurrent scanners cannot possibly see
-        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
-            cur.getSequenceId() <= readpoint) {
-          if (versionsVisible >= 1) {
-            // if we get here we have seen at least one version visible to the oldest scanner,
-            // which means we can prove that no scanner will see this version
-
-            // false means there was a change, so give us the size.
-            long delta = heapSizeChange(cur, true);
-            addedSize -= delta;
-            this.size.addAndGet(-delta);
-            it.remove();
-            setOldestEditTimeToNow();
-          } else {
-            versionsVisible++;
-          }
-        }
-      } else {
-        // past the row or column, done
-        break;
-      }
-    }
-    return addedSize;
-  }
-
-  /**
-   * @return scanner on memstore and snapshot in this order.
-   */
-  @Override
-  public List<KeyValueScanner> getScanners(long readPt) {
-    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
-  }
-
-  /**
-   * Check if this memstore may contain the required keys
-   * @param scan scan
-   * @param store holds reference to cf
-   * @param oldestUnexpiredTS
-   * @return False if the key definitely does not exist in this Memstore
-   */
-  public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
-    byte[] cf = store.getFamily().getName();
-    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
-    if (timeRange == null) {
-      timeRange = scan.getTimeRange();
-    }
-    return (timeRangeTracker.includesTimeRange(timeRange) ||
-        snapshotTimeRangeTracker.includesTimeRange(timeRange))
-        && (Math.max(timeRangeTracker.getMaximumTimestamp(),
-                     snapshotTimeRangeTracker.getMaximumTimestamp()) >=
-            oldestUnexpiredTS);
-  }
-
-  /*
-   * MemStoreScanner implements the KeyValueScanner.
-   * It lets the caller scan the contents of a memstore -- both current
-   * map and snapshot.
-   * This behaves as if it were a real scanner but does not maintain position.
-   */
-  protected class MemStoreScanner extends NonLazyKeyValueScanner {
-    // Next row information for either cellSet or snapshot
-    private Cell cellSetNextRow = null;
-    private Cell snapshotNextRow = null;
-
-    // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
-    private Cell cellSetItRow = null;
-    private Cell snapshotItRow = null;
-
-    // iterator based scanning.
-    private Iterator<Cell> cellSetIt;
-    private Iterator<Cell> snapshotIt;
-
-    // The cellSet and snapshot at the time of creating this scanner
-    private CellSkipListSet cellSetAtCreation;
-    private CellSkipListSet snapshotAtCreation;
-
-    // the pre-calculated Cell to be returned by peek() or next()
-    private Cell theNext;
-
-    // The allocator and snapshot allocator at the time of creating this scanner
-    volatile MemStoreLAB allocatorAtCreation;
-    volatile MemStoreLAB snapshotAllocatorAtCreation;
-
-    // A flag represents whether could stop skipping Cells for MVCC
-    // if have encountered the next row. Only used for reversed scan
-    private boolean stopSkippingCellsIfNextRow = false;
-
-    private long readPoint;
-
-    /*
-    Some notes...
-
-     So memstorescanner is fixed at creation time. this includes pointers/iterators into
-    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
-    snapshot is moved.  since kvset is null there is no point on reseeking on both,
-      we can save us the trouble. During the snapshot->hfile transition, the memstore
-      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
-      potentially do something smarter by adjusting the existing memstore scanner.
-
-      But there is a greater problem here, that being once a scanner has progressed
-      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
-      if a scan lasts a little while, there is a chance for new entries in kvset to
-      become available but we will never see them.  This needs to be handled at the
-      StoreScanner level with coordination with MemStoreScanner.
-
-      Currently, this problem is only partly managed: during the small amount of time
-      when the StoreScanner has not yet created a new MemStoreScanner, we will miss
-      the adds to kvset in the MemStoreScanner.
-    */
-
-    MemStoreScanner(long readPoint) {
-      super();
-
-      this.readPoint = readPoint;
-      cellSetAtCreation = cellSet;
-      snapshotAtCreation = snapshot;
-      if (allocator != null) {
-        this.allocatorAtCreation = allocator;
-        this.allocatorAtCreation.incScannerCount();
-      }
-      if (snapshotAllocator != null) {
-        this.snapshotAllocatorAtCreation = snapshotAllocator;
-        this.snapshotAllocatorAtCreation.incScannerCount();
-      }
-      if (Trace.isTracing() && Trace.currentSpan() != null) {
-        Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
-      }
-    }
-
-    /**
-     * Lock on 'this' must be held by caller.
-     * @param it
-     * @return Next Cell
-     */
-    private Cell getNext(Iterator<Cell> it) {
-      Cell startCell = theNext;
-      Cell v = null;
-      try {
-        while (it.hasNext()) {
-          v = it.next();
-          if (v.getSequenceId() <= this.readPoint) {
-            return v;
-          }
-          if (stopSkippingCellsIfNextRow && startCell != null
-              && comparator.compareRows(v, startCell) > 0) {
-            return null;
-          }
-        }
-
-        return null;
-      } finally {
-        if (v != null) {
-          // in all cases, remember the last Cell iterated to
-          if (it == snapshotIt) {
-            snapshotItRow = v;
-          } else {
-            cellSetItRow = v;
-          }
-        }
-      }
-    }
-
-    /**
-     *  Set the scanner at the seek key.
-     *  Must be called only once: there is no thread safety between the scanner
-     *   and the memStore.
-     * @param key seek value
-     * @return false if the key is null or if there is no data
-     */
-    @Override
-    public synchronized boolean seek(Cell key) {
-      if (key == null) {
-        close();
-        return false;
-      }
-      // kvset and snapshot will never be null.
-      // if tailSet can't find anything, SortedSet is empty (not null).
-      cellSetIt = cellSetAtCreation.tailSet(key).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
-      cellSetItRow = null;
-      snapshotItRow = null;
-
-      return seekInSubLists(key);
-    }
-
-
-    /**
-     * (Re)initialize the iterators after a seek or a reseek.
-     */
-    private synchronized boolean seekInSubLists(Cell key){
-      cellSetNextRow = getNext(cellSetIt);
-      snapshotNextRow = getNext(snapshotIt);
-
-      // Calculate the next value
-      theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
-      // has data
-      return (theNext != null);
-    }
-
-
-    /**
-     * Move forward on the sub-lists set previously by seek.
-     * @param key seek value (should be non-null)
-     * @return true if there is at least one KV to read, false otherwise
-     */
-    @Override
-    public synchronized boolean reseek(Cell key) {
-      /*
-      See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
-      This code is executed concurrently with flush and puts, without locks.
-      Two points must be known when working on this code:
-      1) It's not possible to use the 'kvTail' and 'snapshot'
-       variables, as they are modified during a flush.
-      2) The ideal implementation for performance would use the sub skip list
-       implicitly pointed by the iterators 'kvsetIt' and
-       'snapshotIt'. Unfortunately the Java API does not offer a method to
-       get it. So we remember the last keys we iterated to and restore
-       the reseeked set to at least that point.
-       */
-      cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
-      return seekInSubLists(key);
-    }
-
-
-    @Override
-    public synchronized Cell peek() {
-      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
-      return theNext;
-    }
-
-    @Override
-    public synchronized Cell next() {
-      if (theNext == null) {
-          return null;
-      }
-
-      final Cell ret = theNext;
-
-      // Advance one of the iterators
-      if (theNext == cellSetNextRow) {
-        cellSetNextRow = getNext(cellSetIt);
-      } else {
-        snapshotNextRow = getNext(snapshotIt);
-      }
-
-      // Calculate the next value
-      theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
-      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
-      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
-      //    getLowest() + " threadpoint=" + readpoint);
-      return ret;
-    }
-
-    /*
-     * Returns the lower of the two key values, or null if they are both null.
-     * This uses comparator.compare() to compare the KeyValue using the memstore
-     * comparator.
-     */
-    private Cell getLowest(Cell first, Cell second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare <= 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    /*
-     * Returns the higher of the two cells, or null if they are both null.
-     * This uses comparator.compare() to compare the Cell using the memstore
-     * comparator.
-     */
-    private Cell getHighest(Cell first, Cell second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare > 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    public synchronized void close() {
-      this.cellSetNextRow = null;
-      this.snapshotNextRow = null;
-
-      this.cellSetIt = null;
-      this.snapshotIt = null;
-
-      if (allocatorAtCreation != null) {
-        this.allocatorAtCreation.decScannerCount();
-        this.allocatorAtCreation = null;
-      }
-      if (snapshotAllocatorAtCreation != null) {
-        this.snapshotAllocatorAtCreation.decScannerCount();
-        this.snapshotAllocatorAtCreation = null;
-      }
-
-      this.cellSetItRow = null;
-      this.snapshotItRow = null;
-    }
-
-    /**
-     * MemStoreScanner returns max value as sequence id because it will
-     * always have the latest data among all files.
-     */
-    @Override
-    public long getSequenceID() {
-      return Long.MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
-      return shouldSeek(scan, store, oldestUnexpiredTS);
-    }
-
-    /**
-     * Seek scanner to the given key first. If it returns false(means
-     * peek()==null) or scanner's peek row is bigger than row of given key, seek
-     * the scanner to the previous row of given key
-     */
-    @Override
-    public synchronized boolean backwardSeek(Cell key) {
-      seek(key);
-      if (peek() == null || comparator.compareRows(peek(), key) > 0) {
-        return seekToPreviousRow(key);
-      }
-      return true;
-    }
-
-    /**
-     * Separately get the KeyValue before the specified key from kvset and
-     * snapshotset, and use the row of higher one as the previous row of
-     * specified key, then seek to the first KeyValue of previous row
-     */
-    @Override
-    public synchronized boolean seekToPreviousRow(Cell originalKey) {
-      boolean keepSeeking = false;
-      Cell key = originalKey;
-      do {
-        Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
-        SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
-        Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
-        SortedSet<Cell> snapshotHead = snapshotAtCreation
-            .headSet(firstKeyOnRow);
-        Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
-            .last();
-        Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
-        if (lastCellBeforeRow == null) {
-          theNext = null;
-          return false;
-        }
-        Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
-        this.stopSkippingCellsIfNextRow = true;
-        seek(firstKeyOnPreviousRow);
-        this.stopSkippingCellsIfNextRow = false;
-        if (peek() == null
-            || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
-          keepSeeking = true;
-          key = firstKeyOnPreviousRow;
-          continue;
-        } else {
-          keepSeeking = false;
-        }
-      } while (keepSeeking);
-      return true;
-    }
-
-    @Override
-    public synchronized boolean seekToLastRow() {
-      Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
-          .last();
-      Cell second = snapshotAtCreation.isEmpty() ? null
-          : snapshotAtCreation.last();
-      Cell higherCell = getHighest(first, second);
-      if (higherCell == null) {
-        return false;
-      }
-      Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
-      if (seek(firstCellOnLastRow)) {
-        return true;
-      } else {
-        return seekToPreviousRow(higherCell);
-      }
-
-    }
-  }
-
-  public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-      + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
-
-  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
-      (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
-  /*
-   * Calculate how the MemStore size has changed.  Includes overhead of the
-   * backing Map.
-   * @param cell
-   * @param notpresent True if the cell was NOT present in the set.
-   * @return Size
-   */
-  static long heapSizeChange(final Cell cell, final boolean notpresent) {
-    return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
-        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
-  }
-
-  private long keySize() {
-    return heapSize() - DEEP_OVERHEAD;
+  public long size() {
+    return heapSize();
   }
 
   /**
-   * Get the entire heap usage for this MemStore not including keys in the
-   * snapshot.
+   * Check whether anything need to be done based on the current active set size
+   * Nothing need to be done for the DefaultMemStore
    */
   @Override
-  public long heapSize() {
-    return size.get();
-  }
-
-  @Override
-  public long size() {
-    return heapSize();
+  protected void checkActiveSize() {
+    return;
   }
 
   /**
@@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore {
     LOG.info("memstore2 estimated size=" + size);
     final int seconds = 30;
     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
-    for (int i = 0; i < seconds; i++) {
-      // Thread.sleep(1000);
-    }
     LOG.info("Exiting.");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c65326a..5c29fb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -18,6 +18,13 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
@@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
  * or more StoreFiles, which stretch backwards over time.
@@ -1636,7 +1636,7 @@ public class HStore implements Store {
       this.lock.readLock().unlock();
     }
 
-    LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
+    LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
         + (request.isAllFiles() ? " (all files)" : ""));
     this.region.reportCompactionRequestStart(request.isMajor());
@@ -1990,8 +1990,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Used in tests. TODO: Remove
-   *
    * Updates the value for the given row/family/qualifier. This function will always be seen as
    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
    * control necessary.
@@ -2002,6 +2000,7 @@ public class HStore implements Store {
    * @return memstore size delta
    * @throws IOException
    */
+  @VisibleForTesting
   public long updateColumnValue(byte [] row, byte [] f,
                                 byte [] qualifier, long newValue)
       throws IOException {
@@ -2055,7 +2054,8 @@ public class HStore implements Store {
      */
     @Override
     public void prepare() {
-      this.snapshot = memstore.snapshot();
+      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
+      this.snapshot = memstore.snapshot(cacheFlushSeqNum);
       this.cacheFlushCount = snapshot.getCellsCount();
       this.cacheFlushSize = snapshot.getSize();
       committedFiles = new ArrayList<Path>(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
new file mode 100644
index 0000000..cfcd81e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
+ * and is not needed for a {@link MutableSegment}. Specifically, the method
+ * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
+ * {@link MemStoreSnapshot} object.
+ * In addition, this class overrides methods that are not likely to be supported by an immutable
+ * segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which
+ * can be very inefficient.
+ */
+@InterfaceAudience.Private
+public abstract class ImmutableSegment extends Segment {
+
+  public ImmutableSegment(Segment segment) {
+    super(segment);
+  }
+
+  /**
+   * Removes the given cell from this segment.
+   * By default immutable store segment can not rollback
+   * It may be invoked by tests in specific cases where it is known to be supported {@link
+   * ImmutableSegmentAdapter}
+   */
+  @Override
+  public long rollback(Cell cell) {
+    return 0;
+  }
+
+  /**
+   * Returns a set of all the cells in the segment.
+   * The implementation of this method might be very inefficient for some immutable segments
+   * that do not maintain a cell set. Therefore by default this method is not supported.
+   * It may be invoked by tests in specific cases where it is known to be supported {@link
+   * ImmutableSegmentAdapter}
+   */
+  @Override
+  public CellSet getCellSet() {
+    throw new NotImplementedException("Immutable Segment does not support this operation by " +
+        "default");
+  }
+
+  /**
+   * Builds a special scanner for the MemStoreSnapshot object that may be different than the
+   * general segment scanner.
+   * @return a special scanner for the MemStoreSnapshot object
+   */
+  public abstract KeyValueScanner getKeyValueScanner();
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
new file mode 100644
index 0000000..058865a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+
+/**
+ * This segment is adapting a mutable segment making it into an immutable segment.
+ * This is used when a mutable segment is moved to being a snapshot or pushed into a compaction
+ * pipeline, that consists only of immutable segments.
+ * The compaction may generate different type of immutable segment
+ */
+@InterfaceAudience.Private
+public class ImmutableSegmentAdapter extends ImmutableSegment {
+
+  final private MutableSegment adaptee;
+
+  public ImmutableSegmentAdapter(MutableSegment segment) {
+    super(segment);
+    this.adaptee = segment;
+  }
+
+  @Override
+  public KeyValueScanner getKeyValueScanner() {
+    return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator());
+  }
+
+  @Override
+  public SegmentScanner getSegmentScanner(long readPoint) {
+    return adaptee.getSegmentScanner(readPoint);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return adaptee.isEmpty();
+  }
+
+  @Override
+  public int getCellsCount() {
+    return adaptee.getCellsCount();
+  }
+
+  @Override
+  public long add(Cell cell) {
+    return adaptee.add(cell);
+  }
+
+  @Override
+  public Cell getFirstAfter(Cell cell) {
+    return adaptee.getFirstAfter(cell);
+  }
+
+  @Override
+  public void close() {
+    adaptee.close();
+  }
+
+  @Override
+  public Cell maybeCloneWithAllocator(Cell cell) {
+    return adaptee.maybeCloneWithAllocator(cell);
+  }
+
+  @Override
+  public Segment setSize(long size) {
+    adaptee.setSize(size);
+    return this;
+  }
+
+  @Override
+  public long getSize() {
+    return adaptee.getSize();
+  }
+
+  @Override
+  public long rollback(Cell cell) {
+    return adaptee.rollback(cell);
+  }
+
+  @Override
+  public CellSet getCellSet() {
+    return adaptee.getCellSet();
+  }
+
+  @Override
+  public void dump(Log log) {
+    adaptee.dump(log);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index e9f8103..a10ccd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -17,10 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.HeapSize;
 
 /**
@@ -41,10 +42,19 @@ public interface MemStore extends HeapSize {
   MemStoreSnapshot snapshot();
 
   /**
+   * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+   * {@link #clearSnapshot(long)}.
+   * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed
+   *                     segment
+   * @return {@link MemStoreSnapshot}
+   */
+  MemStoreSnapshot snapshot(long flushOpSeqId);
+
+  /**
    * Clears the current snapshot of the Memstore.
    * @param id
    * @throws UnexpectedStateException
-   * @see #snapshot()
+   * @see #snapshot(long)
    */
   void clearSnapshot(long id) throws UnexpectedStateException;
 
@@ -128,7 +138,7 @@ public interface MemStore extends HeapSize {
    * @return scanner over the memstore. This might include scanner over the snapshot when one is
    * present.
    */
-  List<KeyValueScanner> getScanners(long readPt);
+  List<KeyValueScanner> getScanners(long readPt) throws IOException;
 
   /**
    * @return Total memory occupied by this MemStore.