You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/02/11 19:39:10 UTC

[1/2] hbase git commit: HBASE-14919 Refactoring for in-memory flush and compaction

Repository: hbase
Updated Branches:
  refs/heads/master a975408b7 -> 25dfc112d


http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
new file mode 100644
index 0000000..dfcec25
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.htrace.Trace;
+
+/**
+ * This is the scanner for any MemStore implementation, derived from MemStore.
+ * The MemStoreScanner combines SegmentScanner from different Segments and
+ * uses the key-value heap and the reversed key-value heap for the aggregated key-values set.
+ * It is assumed that only traversing forward or backward is used (without zigzagging in between)
+ */
+@InterfaceAudience.Private
+public class MemStoreScanner extends NonLazyKeyValueScanner {
+  /**
+   * Types of cell MemStoreScanner
+   */
+  static public enum Type {
+    UNDEFINED,
+    COMPACT_FORWARD,
+    USER_SCAN_FORWARD,
+    USER_SCAN_BACKWARD
+  }
+
+  // heap of scanners used for traversing forward
+  private KeyValueHeap forwardHeap;
+  // reversed scanners heap for traversing backward
+  private ReversedKeyValueHeap backwardHeap;
+
+  // The type of the scan is defined by constructor
+  // or according to the first usage
+  private Type type = Type.UNDEFINED;
+
+  private long readPoint;
+  // remember the initial version of the scanners list
+  List<SegmentScanner> scanners;
+  // pointer back to the relevant MemStore
+  // is needed for shouldSeek() method
+  private AbstractMemStore backwardReferenceToMemStore;
+
+  /**
+   * Constructor.
+   * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
+   * After constructor only one heap is going to be initialized for entire lifespan
+   * of the MemStoreScanner. A specific scanner can only be one directional!
+   *
+   * @param ms        Pointer back to the MemStore
+   * @param readPoint Read point below which we can safely remove duplicate KVs
+   * @param type      The scan type COMPACT_FORWARD should be used for compaction
+   */
+  public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException {
+    this(ms, ms.getListOfScanners(readPoint), readPoint, type);
+  }
+
+  /* Constructor used only when the scan usage is unknown
+  and need to be defined according to the first move */
+  public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException {
+    this(ms, readPt, Type.UNDEFINED);
+  }
+
+  public MemStoreScanner(AbstractMemStore ms, List<SegmentScanner> scanners, long readPoint,
+      Type type) throws IOException {
+    super();
+    this.readPoint = readPoint;
+    this.type = type;
+    switch (type) {
+      case UNDEFINED:
+      case USER_SCAN_FORWARD:
+      case COMPACT_FORWARD:
+        this.forwardHeap = new KeyValueHeap(scanners, ms.getComparator());
+        break;
+      case USER_SCAN_BACKWARD:
+        this.backwardHeap = new ReversedKeyValueHeap(scanners, ms.getComparator());
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner");
+    }
+    this.backwardReferenceToMemStore = ms;
+    this.scanners = scanners;
+    if (Trace.isTracing() && Trace.currentSpan() != null) {
+      Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
+    }
+  }
+
+  /**
+   * Returns the cell from the top-most scanner without advancing the iterator.
+   * The backward traversal is assumed, only if specified explicitly
+   */
+  @Override
+  public synchronized Cell peek() {
+    if (type == Type.USER_SCAN_BACKWARD) {
+      return backwardHeap.peek();
+    }
+    return forwardHeap.peek();
+  }
+
+  /**
+   * Gets the next cell from the top-most scanner. Assumed forward scanning.
+   */
+  @Override
+  public synchronized Cell next() throws IOException {
+    KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap;
+
+    // loop over till the next suitable value
+    // take next value from the heap
+    for (Cell currentCell = heap.next();
+         currentCell != null;
+         currentCell = heap.next()) {
+
+      // all the logic of presenting cells is inside the internal SegmentScanners
+      // located inside the heap
+
+      return currentCell;
+    }
+    return null;
+  }
+
+  /**
+   * Set the scanner at the seek key. Assumed forward scanning.
+   * Must be called only once: there is no thread safety between the scanner
+   * and the memStore.
+   *
+   * @param cell seek value
+   * @return false if the key is null or if there is no data
+   */
+  @Override
+  public synchronized boolean seek(Cell cell) throws IOException {
+    assertForward();
+
+    if (cell == null) {
+      close();
+      return false;
+    }
+
+    return forwardHeap.seek(cell);
+  }
+
+  /**
+   * Move forward on the sub-lists set previously by seek. Assumed forward scanning.
+   *
+   * @param cell seek value (should be non-null)
+   * @return true if there is at least one KV to read, false otherwise
+   */
+  @Override
+  public synchronized boolean reseek(Cell cell) throws IOException {
+    /*
+    * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+    * This code is executed concurrently with flush and puts, without locks.
+    * Two points must be known when working on this code:
+    * 1) It's not possible to use the 'kvTail' and 'snapshot'
+    *  variables, as they are modified during a flush.
+    * 2) The ideal implementation for performance would use the sub skip list
+    *  implicitly pointed by the iterators 'kvsetIt' and
+    *  'snapshotIt'. Unfortunately the Java API does not offer a method to
+    *  get it. So we remember the last keys we iterated to and restore
+    *  the reseeked set to at least that point.
+    *
+    *  TODO: The above comment copied from the original MemStoreScanner
+    */
+    assertForward();
+    return forwardHeap.reseek(cell);
+  }
+
+  /**
+   * MemStoreScanner returns max value as sequence id because it will
+   * always have the latest data among all files.
+   */
+  @Override
+  public synchronized long getSequenceID() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void close() {
+
+    if (forwardHeap != null) {
+      assert ((type == Type.USER_SCAN_FORWARD) ||
+          (type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED));
+      forwardHeap.close();
+      forwardHeap = null;
+      if (backwardHeap != null) {
+        backwardHeap.close();
+        backwardHeap = null;
+      }
+    } else if (backwardHeap != null) {
+      assert (type == Type.USER_SCAN_BACKWARD);
+      backwardHeap.close();
+      backwardHeap = null;
+    }
+  }
+
+  /**
+   * Set the scanner at the seek key. Assumed backward scanning.
+   *
+   * @param cell seek value
+   * @return false if the key is null or if there is no data
+   */
+  @Override
+  public synchronized boolean backwardSeek(Cell cell) throws IOException {
+    initBackwardHeapIfNeeded(cell, false);
+    return backwardHeap.backwardSeek(cell);
+  }
+
+  /**
+   * Assumed backward scanning.
+   *
+   * @param cell seek value
+   * @return false if the key is null or if there is no data
+   */
+  @Override
+  public synchronized boolean seekToPreviousRow(Cell cell) throws IOException {
+    initBackwardHeapIfNeeded(cell, false);
+    if (backwardHeap.peek() == null) {
+      restartBackwardHeap(cell);
+    }
+    return backwardHeap.seekToPreviousRow(cell);
+  }
+
+  @Override
+  public synchronized boolean seekToLastRow() throws IOException {
+    // TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't
+    // implement seekToLastRow() method :(
+    // however seekToLastRow() was implemented in internal MemStoreScanner
+    // so I wonder whether we need to come with our own workaround, or to update
+    // ReversedKeyValueHeap
+    return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true);
+  }
+
+  /**
+   * Check if this memstore may contain the required keys
+   * @return False if the key definitely does not exist in this Memstore
+   */
+  @Override
+  public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+
+    if (type == Type.COMPACT_FORWARD) {
+      return true;
+    }
+
+    for (SegmentScanner sc : scanners) {
+      if (sc.shouldSeek(scan, oldestUnexpiredTS)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // debug method
+  @Override
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    int i = 1;
+    for (SegmentScanner scanner : scanners) {
+      buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
+      i++;
+    }
+    return buf.toString();
+  }
+  /****************** Private methods ******************/
+  /**
+   * Restructure the ended backward heap after rerunning a seekToPreviousRow()
+   * on each scanner
+   * @return false if given Cell does not exist in any scanner
+   */
+  private boolean restartBackwardHeap(Cell cell) throws IOException {
+    boolean res = false;
+    for (SegmentScanner scan : scanners) {
+      res |= scan.seekToPreviousRow(cell);
+    }
+    this.backwardHeap =
+        new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
+    return res;
+  }
+
+  /**
+   * Checks whether the type of the scan suits the assumption of moving backward
+   */
+  private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException {
+    boolean res = false;
+    if (toLast && (type != Type.UNDEFINED)) {
+      throw new IllegalStateException(
+          "Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString());
+    }
+    if (type == Type.UNDEFINED) {
+      // In case we started from peek, release the forward heap
+      // and build backward. Set the correct type. Thus this turn
+      // can happen only once
+      if ((backwardHeap == null) && (forwardHeap != null)) {
+        forwardHeap.close();
+        forwardHeap = null;
+        // before building the heap seek for the relevant key on the scanners,
+        // for the heap to be built from the scanners correctly
+        for (SegmentScanner scan : scanners) {
+          if (toLast) {
+            res |= scan.seekToLastRow();
+          } else {
+            res |= scan.backwardSeek(cell);
+          }
+        }
+        this.backwardHeap =
+            new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
+        type = Type.USER_SCAN_BACKWARD;
+      }
+    }
+
+    if (type == Type.USER_SCAN_FORWARD) {
+      throw new IllegalStateException("Traversing backward with forward scan");
+    }
+    return res;
+  }
+
+  /**
+   * Checks whether the type of the scan suits the assumption of moving forward
+   */
+  private void assertForward() throws IllegalStateException {
+    if (type == Type.UNDEFINED) {
+      type = Type.USER_SCAN_FORWARD;
+    }
+
+    if (type == Type.USER_SCAN_BACKWARD) {
+      throw new IllegalStateException("Traversing forward with backward scan");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index be853c5..28ab693 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -34,14 +34,13 @@ public class MemStoreSnapshot {
   private final KeyValueScanner scanner;
   private final boolean tagsPresent;
 
-  public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker,
-      KeyValueScanner scanner, boolean tagsPresent) {
+  public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
-    this.cellsCount = cellsCount;
-    this.size = size;
-    this.timeRangeTracker = timeRangeTracker;
-    this.scanner = scanner;
-    this.tagsPresent = tagsPresent;
+    this.cellsCount = snapshot.getCellsCount();
+    this.size = snapshot.getSize();
+    this.timeRangeTracker = snapshot.getTimeRangeTracker();
+    this.scanner = snapshot.getKeyValueScanner();
+    this.tagsPresent = snapshot.isTagsPresent();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java
new file mode 100644
index 0000000..743416c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java
@@ -0,0 +1,153 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Iterator;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This mutable store segment encapsulates a mutable cell set and its respective memory allocation
+ * buffers (MSLAB).
+ */
+@InterfaceAudience.Private
+final class MutableCellSetSegment extends MutableSegment {
+
+  private volatile CellSet cellSet;
+  private final CellComparator comparator;
+
+  // Instantiate objects only using factory
+  MutableCellSetSegment(CellSet cellSet, MemStoreLAB memStoreLAB, long size,
+      CellComparator comparator) {
+    super(memStoreLAB, size);
+    this.cellSet = cellSet;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public SegmentScanner getSegmentScanner(long readPoint) {
+    return new MutableCellSetSegmentScanner(this, readPoint);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return getCellSet().isEmpty();
+  }
+
+  @Override
+  public int getCellsCount() {
+    return getCellSet().size();
+  }
+
+  @Override
+  public long add(Cell cell) {
+    boolean succ = getCellSet().add(cell);
+    long s = AbstractMemStore.heapSizeChange(cell, succ);
+    updateMetaInfo(cell, s);
+    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
+    // When we use ACL CP or Visibility CP which deals with Tags during
+    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
+    // parse the byte[] to identify the tags length.
+    if(cell.getTagsLength() > 0) {
+      tagsPresent = true;
+    }
+    return s;
+  }
+
+  @Override
+  public long rollback(Cell cell) {
+    Cell found = get(cell);
+    if (found != null && found.getSequenceId() == cell.getSequenceId()) {
+      long sz = AbstractMemStore.heapSizeChange(cell, true);
+      remove(cell);
+      incSize(-sz);
+      return sz;
+    }
+    return 0;
+  }
+
+  @Override
+  public Cell getFirstAfter(Cell cell) {
+    SortedSet<Cell> snTailSet = tailSet(cell);
+    if (!snTailSet.isEmpty()) {
+      return snTailSet.first();
+    }
+    return null;
+  }
+
+  @Override
+  public void dump(Log log) {
+    for (Cell cell: getCellSet()) {
+      log.debug(cell);
+    }
+  }
+
+  @Override
+  public SortedSet<Cell> tailSet(Cell firstCell) {
+    return getCellSet().tailSet(firstCell);
+  }
+  @Override
+  public CellSet getCellSet() {
+    return cellSet;
+  }
+  @Override
+  public CellComparator getComparator() {
+    return comparator;
+  }
+
+  //*** Methods for MemStoreSegmentsScanner
+  public Cell last() {
+    return getCellSet().last();
+  }
+
+  public Iterator<Cell> iterator() {
+    return getCellSet().iterator();
+  }
+
+  public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
+    return getCellSet().headSet(firstKeyOnRow);
+  }
+
+  public int compare(Cell left, Cell right) {
+    return getComparator().compare(left, right);
+  }
+
+  public int compareRows(Cell left, Cell right) {
+    return getComparator().compareRows(left, right);
+  }
+
+  private Cell get(Cell cell) {
+    return getCellSet().get(cell);
+  }
+
+  private boolean remove(Cell e) {
+    return getCellSet().remove(e);
+  }
+
+  // methods for tests
+  @Override
+  Cell first() {
+    return this.getCellSet().first();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java
new file mode 100644
index 0000000..17791ff
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A scanner of a single cells segment {@link MutableCellSetSegment}.
+ */
+@InterfaceAudience.Private
+class MutableCellSetSegmentScanner extends SegmentScanner {
+
+  // the observed structure
+  private final MutableCellSetSegment segment;
+  // the highest relevant MVCC
+  private long readPoint;
+  // the current iterator that can be reinitialized by
+  // seek(), backwardSeek(), or reseek()
+  private Iterator<Cell> iter;
+  // the pre-calculated cell to be returned by peek()
+  private Cell current = null;
+  // or next()
+  // A flag represents whether could stop skipping KeyValues for MVCC
+  // if have encountered the next row. Only used for reversed scan
+  private boolean stopSkippingKVsIfNextRow = false;
+  // last iterated KVs by seek (to restore the iterator state after reseek)
+  private Cell last = null;
+
+  public MutableCellSetSegmentScanner(MutableCellSetSegment segment, long readPoint) {
+    super();
+    this.segment = segment;
+    this.readPoint = readPoint;
+    iter = segment.iterator();
+    // the initialization of the current is required for working with heap of SegmentScanners
+    current = getNext();
+    //increase the reference count so the underlying structure will not be de-allocated
+    this.segment.incScannerCount();
+  }
+
+  /**
+   * Look at the next Cell in this scanner, but do not iterate the scanner
+   * @return the currently observed Cell
+   */
+  @Override
+  public Cell peek() {          // sanity check, the current should be always valid
+    if (current!=null && current.getSequenceId() > readPoint) {
+      throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
+          "while current sequence id is " +current.getSequenceId());
+    }
+
+    return current;
+  }
+
+  /**
+   * Return the next Cell in this scanner, iterating the scanner
+   * @return the next Cell or null if end of scanner
+   */
+  @Override
+  public Cell next() throws IOException {
+    Cell oldCurrent = current;
+    current = getNext();                  // update the currently observed Cell
+    return oldCurrent;
+  }
+
+  /**
+   * Seek the scanner at or after the specified Cell.
+   * @param cell seek value
+   * @return true if scanner has values left, false if end of scanner
+   */
+  @Override
+  public boolean seek(Cell cell) throws IOException {
+    if(cell == null) {
+      close();
+      return false;
+    }
+    // restart the iterator from new key
+    iter = segment.tailSet(cell).iterator();
+    // last is going to be reinitialized in the next getNext() call
+    last = null;
+    current = getNext();
+    return (current != null);
+  }
+
+  /**
+   * Reseek the scanner at or after the specified KeyValue.
+   * This method is guaranteed to seek at or after the required key only if the
+   * key comes after the current position of the scanner. Should not be used
+   * to seek to a key which may come before the current position.
+   *
+   * @param cell seek value (should be non-null)
+   * @return true if scanner has values left, false if end of scanner
+   */
+  @Override
+  public boolean reseek(Cell cell) throws IOException {
+
+    /*
+    See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+    This code is executed concurrently with flush and puts, without locks.
+    The ideal implementation for performance would use the sub skip list implicitly
+    pointed by the iterator. Unfortunately the Java API does not offer a method to
+    get it. So we remember the last keys we iterated to and restore
+    the reseeked set to at least that point.
+    */
+    iter = segment.tailSet(getHighest(cell, last)).iterator();
+    current = getNext();
+    return (current != null);
+  }
+
+  /**
+   * Seek the scanner at or before the row of specified Cell, it firstly
+   * tries to seek the scanner at or after the specified Cell, return if
+   * peek KeyValue of scanner has the same row with specified Cell,
+   * otherwise seek the scanner at the first Cell of the row which is the
+   * previous row of specified KeyValue
+   *
+   * @param key seek Cell
+   * @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
+   */
+  @Override
+  public boolean backwardSeek(Cell key) throws IOException {
+    seek(key);    // seek forward then go backward
+    if (peek() == null || segment.compareRows(peek(), key) > 0) {
+      return seekToPreviousRow(key);
+    }
+    return true;
+  }
+
+  /**
+   * Seek the scanner at the first Cell of the row which is the previous row
+   * of specified key
+   *
+   * @param cell seek value
+   * @return true if the scanner at the first valid Cell of previous row,
+   *     false if not existing such Cell
+   */
+  @Override
+  public boolean seekToPreviousRow(Cell cell) throws IOException {
+    boolean keepSeeking = false;
+    Cell key = cell;
+
+    do {
+      Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
+      SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow);
+      Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
+      if (lastCellBeforeRow == null) {
+        current = null;
+        return false;
+      }
+      Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
+      this.stopSkippingKVsIfNextRow = true;
+      seek(firstKeyOnPreviousRow);
+      this.stopSkippingKVsIfNextRow = false;
+      if (peek() == null
+          || segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0) {
+        keepSeeking = true;
+        key = firstKeyOnPreviousRow;
+        continue;
+      } else {
+        keepSeeking = false;
+      }
+    } while (keepSeeking);
+    return true;
+  }
+
+  /**
+   * Seek the scanner at the first KeyValue of last row
+   *
+   * @return true if scanner has values left, false if the underlying data is empty
+   */
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    Cell higherCell = segment.isEmpty() ? null : segment.last();
+    if (higherCell == null) {
+      return false;
+    }
+
+    Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
+
+    if (seek(firstCellOnLastRow)) {
+      return true;
+    } else {
+      return seekToPreviousRow(higherCell);
+    }
+  }
+
+  @Override protected Segment getSegment() {
+    return segment;
+  }
+
+  /********************* Private Methods **********************/
+
+  /**
+   * Private internal method for iterating over the segment,
+   * skipping the cells with irrelevant MVCC
+   */
+  private Cell getNext() {
+    Cell startKV = current;
+    Cell next = null;
+
+    try {
+      while (iter.hasNext()) {
+        next = iter.next();
+        if (next.getSequenceId() <= this.readPoint) {
+          return next;                    // skip irrelevant versions
+        }
+        if (stopSkippingKVsIfNextRow &&   // for backwardSeek() stay in the
+            startKV != null &&        // boundaries of a single row
+            segment.compareRows(next, startKV) > 0) {
+          return null;
+        }
+      } // end of while
+
+      return null; // nothing found
+    } finally {
+      if (next != null) {
+        // in all cases, remember the last KV we iterated to, needed for reseek()
+        last = next;
+      }
+    }
+  }
+
+  /**
+   * Private internal method that returns the higher of the two key values, or null
+   * if they are both null
+   */
+  private Cell getHighest(Cell first, Cell second) {
+    if (first == null && second == null) {
+      return null;
+    }
+    if (first != null && second != null) {
+      int compare = segment.compare(first, second);
+      return (compare > 0 ? first : second);
+    }
+    return (first != null ? first : second);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
new file mode 100644
index 0000000..fcaddb0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * An abstraction of a mutable segment in memstore, specifically the active segment.
+ */
+@InterfaceAudience.Private
+public abstract class MutableSegment extends Segment {
+
+  protected MutableSegment(MemStoreLAB memStoreLAB, long size) {
+    super(memStoreLAB, size);
+  }
+
+  /**
+   * Returns a subset of the segment cell set, which starts with the given cell
+   * @param firstCell a cell in the segment
+   * @return a subset of the segment cell set, which starts with the given cell
+   */
+  public abstract SortedSet<Cell> tailSet(Cell firstCell);
+
+  /**
+   * Returns the Cell comparator used by this segment
+   * @return the Cell comparator used by this segment
+   */
+  public abstract CellComparator getComparator();
+
+  //methods for test
+
+  /**
+   * Returns the first cell in the segment
+   * @return the first cell in the segment
+   */
+  abstract Cell first();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
new file mode 100644
index 0000000..7891809
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.ByteRange;
+
+/**
+ * This is an abstraction of a segment maintained in a memstore, e.g., the active
+ * cell set or its snapshot.
+ *
+ * This abstraction facilitates the management of the compaction pipeline and the shifts of these
+ * segments from active set to snapshot set in the default implementation.
+ */
+@InterfaceAudience.Private
+public abstract class Segment {
+
+  private volatile MemStoreLAB memStoreLAB;
+  private final AtomicLong size;
+  private final TimeRangeTracker timeRangeTracker;
+  protected volatile boolean tagsPresent;
+
+  protected Segment(MemStoreLAB memStoreLAB, long size) {
+    this.memStoreLAB = memStoreLAB;
+    this.size = new AtomicLong(size);
+    this.timeRangeTracker = new TimeRangeTracker();
+    this.tagsPresent = false;
+  }
+
+  protected Segment(Segment segment) {
+    this.memStoreLAB = segment.getMemStoreLAB();
+    this.size = new AtomicLong(segment.getSize());
+    this.timeRangeTracker = segment.getTimeRangeTracker();
+    this.tagsPresent = segment.isTagsPresent();
+  }
+
+  /**
+   * Creates the scanner that is able to scan the concrete segment
+   * @return a scanner for the given read point
+   */
+  public abstract SegmentScanner getSegmentScanner(long readPoint);
+
+  /**
+   * Returns whether the segment has any cells
+   * @return whether the segment has any cells
+   */
+  public abstract boolean isEmpty();
+
+  /**
+   * Returns number of cells in segment
+   * @return number of cells in segment
+   */
+  public abstract int getCellsCount();
+
+  /**
+   * Adds the given cell into the segment
+   * @return the change in the heap size
+   */
+  public abstract long add(Cell cell);
+
+  /**
+   * Removes the given cell from the segment
+   * @return the change in the heap size
+   */
+  public abstract long rollback(Cell cell);
+
+  /**
+   * Returns the first cell in the segment that has equal or greater key than the given cell
+   * @return the first cell in the segment that has equal or greater key than the given cell
+   */
+  public abstract Cell getFirstAfter(Cell cell);
+
+  /**
+   * Returns a set of all cells in the segment
+   * @return a set of all cells in the segment
+   */
+  public abstract CellSet getCellSet();
+
+  /**
+   * Closing a segment before it is being discarded
+   */
+  public void close() {
+    MemStoreLAB mslab = getMemStoreLAB();
+    if(mslab != null) {
+      mslab.close();
+    }
+    // do not set MSLab to null as scanners may still be reading the data here and need to decrease
+    // the counter when they finish
+  }
+
+  /**
+   * If the segment has a memory allocator the cell is being cloned to this space, and returned;
+   * otherwise the given cell is returned
+   * @return either the given cell or its clone
+   */
+  public Cell maybeCloneWithAllocator(Cell cell) {
+    if (getMemStoreLAB() == null) {
+      return cell;
+    }
+
+    int len = KeyValueUtil.length(cell);
+    ByteRange alloc = getMemStoreLAB().allocateBytes(len);
+    if (alloc == null) {
+      // The allocation was too large, allocator decided
+      // not to do anything with it.
+      return cell;
+    }
+    assert alloc.getBytes() != null;
+    KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
+    KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
+    newKv.setSequenceId(cell.getSequenceId());
+    return newKv;
+  }
+
+  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
+    return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange())
+        && (getTimeRangeTracker().getMaximumTimestamp() >=
+        oldestUnexpiredTS));
+  }
+
+  public long getMinTimestamp() {
+    return getTimeRangeTracker().getMinimumTimestamp();
+  }
+
+  public boolean isTagsPresent() {
+    return tagsPresent;
+  }
+
+  public void incScannerCount() {
+    if(getMemStoreLAB() != null) {
+      getMemStoreLAB().incScannerCount();
+    }
+  }
+
+  public void decScannerCount() {
+    if(getMemStoreLAB() != null) {
+      getMemStoreLAB().decScannerCount();
+    }
+  }
+
+  /**
+   * Setting the heap size of the segment - used to account for different class overheads
+   * @return this object
+   */
+
+  public Segment setSize(long size) {
+    this.size.set(size);
+    return this;
+  }
+
+  /**
+   * Returns the heap size of the segment
+   * @return the heap size of the segment
+   */
+  public long getSize() {
+    return size.get();
+  }
+
+  /**
+   * Increases the heap size counter of the segment by the given delta
+   */
+  public void incSize(long delta) {
+    size.addAndGet(delta);
+  }
+
+  public TimeRangeTracker getTimeRangeTracker() {
+    return timeRangeTracker;
+  }
+
+  protected void updateMetaInfo(Cell toAdd, long s) {
+    getTimeRangeTracker().includeTimestamp(toAdd);
+    size.addAndGet(s);
+  }
+
+  private MemStoreLAB getMemStoreLAB() {
+    return memStoreLAB;
+  }
+
+  // Debug methods
+  /**
+   * Dumps all cells of the segment into the given log
+   */
+  public abstract void dump(Log log);
+
+  @Override
+  public String toString() {
+    String res = "Store segment of type "+this.getClass().getName()+"; ";
+    res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
+    res += "cellCount "+getCellsCount()+"; ";
+    res += "size "+getSize()+"; ";
+    res += "Min ts "+getMinTimestamp()+"; ";
+    return res;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
new file mode 100644
index 0000000..ccb11df
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A singleton store segment factory.
+ * Generate concrete store segments.
+ */
+@InterfaceAudience.Private
+public final class SegmentFactory {
+
+  static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
+  static final boolean USEMSLAB_DEFAULT = true;
+  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
+
+  private SegmentFactory() {}
+  private static SegmentFactory instance = new SegmentFactory();
+  public static SegmentFactory instance() {
+    return instance;
+  }
+
+  public ImmutableSegment createImmutableSegment(final Configuration conf,
+      final CellComparator comparator, long size) {
+    MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+    MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size);
+    return createImmutableSegment(conf, segment);
+  }
+
+  public ImmutableSegment createImmutableSegment(CellComparator comparator,
+      long size) {
+    MutableSegment segment = generateMutableSegment(null, comparator, null, size);
+    return createImmutableSegment(null, segment);
+  }
+
+  public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) {
+    return generateImmutableSegment(conf, segment);
+  }
+  public MutableSegment createMutableSegment(final Configuration conf,
+      CellComparator comparator, long size) {
+    MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
+    return generateMutableSegment(conf, comparator, memStoreLAB, size);
+  }
+
+  //****** private methods to instantiate concrete store segments **********//
+
+  private ImmutableSegment generateImmutableSegment(final Configuration conf,
+      MutableSegment segment) {
+    // TBD use configuration to set type of segment
+    return new ImmutableSegmentAdapter(segment);
+  }
+  private MutableSegment generateMutableSegment(
+      final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) {
+    // TBD use configuration to set type of segment
+    CellSet set = new CellSet(comparator);
+    return new MutableCellSetSegment(set, memStoreLAB, size, comparator);
+  }
+
+  private MemStoreLAB getMemStoreLAB(Configuration conf) {
+    MemStoreLAB memStoreLAB = null;
+    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
+      memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class[] { Configuration.class }, new Object[] { conf });
+    }
+    return memStoreLAB;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
new file mode 100644
index 0000000..8852d5c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -0,0 +1,152 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * An abstraction for store segment scanner.
+ */
+@InterfaceAudience.Private
+public abstract class SegmentScanner implements KeyValueScanner {
+
+  private long sequenceID = Long.MAX_VALUE;
+
+  protected abstract Segment getSegment();
+
+  /**
+   * Get the sequence id associated with this KeyValueScanner. This is required
+   * for comparing multiple files (or memstore segments) scanners to find out
+   * which one has the latest data.
+   *
+   */
+  @Override
+  public long getSequenceID() {
+    return sequenceID;
+  }
+
+  /**
+   * Close the KeyValue scanner.
+   */
+  @Override
+  public void close() {
+    getSegment().decScannerCount();
+  }
+
+  /**
+   * This functionality should be resolved in the higher level which is
+   * MemStoreScanner, currently returns true as default. Doesn't throw
+   * IllegalStateException in order not to change the signature of the
+   * overridden method
+   */
+  @Override
+  public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+    return true;
+  }
+  /**
+   * This scanner is working solely on the in-memory MemStore therefore this
+   * interface is not relevant.
+   */
+  @Override
+  public boolean requestSeek(Cell c, boolean forward, boolean useBloom)
+      throws IOException {
+
+    throw new IllegalStateException(
+        "requestSeek cannot be called on MutableCellSetSegmentScanner");
+  }
+
+  /**
+   * This scanner is working solely on the in-memory MemStore and doesn't work on
+   * store files, MutableCellSetSegmentScanner always does the seek,
+   * therefore always returning true.
+   */
+  @Override
+  public boolean realSeekDone() {
+    return true;
+  }
+
+  /**
+   * This function should be never called on scanners that always do real seek operations (i.e. most
+   * of the scanners and also this one). The easiest way to achieve this is to call
+   * {@link #realSeekDone()} first.
+   */
+  @Override
+  public void enforceSeek() throws IOException {
+    throw new IllegalStateException(
+        "enforceSeek cannot be called on MutableCellSetSegmentScanner");
+  }
+
+  /**
+   * @return true if this is a file scanner. Otherwise a memory scanner is assumed.
+   */
+  @Override
+  public boolean isFileScanner() {
+    return false;
+  }
+
+  /**
+   * @return the next key in the index (the key to seek to the next block)
+   *     if known, or null otherwise
+   *     Not relevant for in-memory scanner
+   */
+  @Override
+  public Cell getNextIndexedKey() {
+    return null;
+  }
+
+  /**
+   * Called after a batch of rows scanned (RPC) and set to be returned to client. Any in between
+   * cleanup can be done here. Nothing to be done for MutableCellSetSegmentScanner.
+   */
+  @Override
+  public void shipped() throws IOException {
+    // do nothing
+  }
+
+  /**
+   * Set the sequence id of the scanner.
+   * This is used to determine an order between memory segment scanners.
+   * @param x a unique sequence id
+   */
+  public void setSequenceID(long x) {
+    sequenceID = x;
+  }
+
+  /**
+   * Returns whether the given scan should seek in this segment
+   * @return whether the given scan should seek in this segment
+   */
+  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
+    return getSegment().shouldSeek(scan,oldestUnexpiredTS);
+  }
+
+  //debug method
+  @Override
+  public String toString() {
+    String res = "Store segment scanner of type "+this.getClass().getName()+"; ";
+    res += "sequence id "+getSequenceID()+"; ";
+    res += getSegment().toString();
+    return res;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
index 34ba1fa..f4f25dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 4f30960..5c79d72 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -19,22 +19,6 @@
 
 package org.apache.hadoop.hbase.io;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
@@ -42,9 +26,9 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
-import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
-import org.apache.hadoop.hbase.regionserver.CellSkipListSet;
+import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
+import org.apache.hadoop.hbase.regionserver.CellSet;
 import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -56,6 +40,22 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -237,8 +237,8 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
-    // CellSkipListSet
-    cl = CellSkipListSet.class;
+    // CellSet
+    cl = CellSet.class;
     expected = ClassSize.estimateBase(cl, false);
     actual = ClassSize.CELL_SKIPLIST_SET;
     if (expected != actual) {
@@ -305,15 +305,16 @@ public class TestHeapSize  {
     // DefaultMemStore Deep Overhead
     actual = DefaultMemStore.DEEP_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
-    expected += ClassSize.estimateBase(AtomicLong.class, false);
-    expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false));
+    expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
+    expected += (2 * ClassSize.estimateBase(CellSet.class, false));
     expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
     expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
-      ClassSize.estimateBase(CellSkipListSet.class, true);
-      ClassSize.estimateBase(CellSkipListSet.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java
index 684839d..e0cc39f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java
@@ -18,11 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.Iterator;
-import java.util.SortedSet;
-
 import junit.framework.TestCase;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
@@ -32,10 +28,13 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.experimental.categories.Category;
 
+import java.util.Iterator;
+import java.util.SortedSet;
+
 @Category({RegionServerTests.class, SmallTests.class})
 public class TestCellSkipListSet extends TestCase {
-  private final CellSkipListSet csls =
-    new CellSkipListSet(CellComparator.COMPARATOR);
+  private final CellSet csls =
+    new CellSet(CellComparator.COMPARATOR);
 
   protected void setUp() throws Exception {
     super.setUp();
@@ -163,4 +162,4 @@ public class TestCellSkipListSet extends TestCase {
     assertTrue(Bytes.equals(head.first().getValueArray(), head.first().getValueOffset(),
       head.first().getValueLength(), value2, 0, value2.length));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index ec70740..5e6007d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -18,17 +18,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,12 +50,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /** memstore test case */
 @Category({RegionServerTests.class, MediumTests.class})
@@ -89,11 +84,9 @@ public class TestDefaultMemStore extends TestCase {
     byte [] other = Bytes.toBytes("somethingelse");
     KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
     this.memstore.add(samekey);
-    Cell found = this.memstore.cellSet.first();
-    assertEquals(1, this.memstore.cellSet.size());
-    assertTrue(
-      Bytes.toString(found.getValueArray(), found.getValueOffset(), found.getValueLength()),
-      CellUtil.matchingValue(samekey, found));
+    Cell found = this.memstore.getActive().first();
+    assertEquals(1, this.memstore.getActive().getCellsCount());
+    assertTrue(Bytes.toString(found.getValueArray()), CellUtil.matchingValue(samekey, found));
   }
 
   /**
@@ -108,7 +101,7 @@ public class TestDefaultMemStore extends TestCase {
     Configuration conf = HBaseConfiguration.create();
     ScanInfo scanInfo =
         new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
-            this.memstore.comparator);
+            this.memstore.getComparator());
     ScanType scanType = ScanType.USER_SCAN;
     StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     int count = 0;
@@ -476,7 +469,7 @@ public class TestDefaultMemStore extends TestCase {
     for (int i = 0; i < snapshotCount; i++) {
       addRows(this.memstore);
       runSnapshot(this.memstore);
-      assertEquals("History not being cleared", 0, this.memstore.snapshot.size());
+      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
     }
   }
 
@@ -497,7 +490,7 @@ public class TestDefaultMemStore extends TestCase {
     m.add(key2);
 
     assertTrue("Expected memstore to hold 3 values, actually has " +
-        m.cellSet.size(), m.cellSet.size() == 3);
+        m.getActive().getCellsCount(), m.getActive().getCellsCount() == 3);
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -529,7 +522,7 @@ public class TestDefaultMemStore extends TestCase {
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-        KeepDeletedCells.FALSE, 0, this.memstore.comparator);
+        KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
       ScanType scanType = ScanType.USER_SCAN;
       InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,
@@ -570,12 +563,12 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(new KeyValue(row, fam ,qf3, val));
     //Creating a snapshot
     memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
     //Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.getActive().getCellsCount());
     memstore.add(new KeyValue(row, fam ,qf4, val));
     memstore.add(new KeyValue(row, fam ,qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.getActive().getCellsCount());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -597,7 +590,7 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(put2);
     memstore.add(put3);
 
-    assertEquals(3, memstore.cellSet.size());
+    assertEquals(3, memstore.getActive().getCellsCount());
 
     KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
     memstore.delete(del2);
@@ -608,9 +601,9 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put2);
     expected.add(put1);
 
-    assertEquals(4, memstore.cellSet.size());
+    assertEquals(4, memstore.getActive().getCellsCount());
     int i = 0;
-    for(Cell cell : memstore.cellSet) {
+    for(Cell cell : memstore.getActive().getCellSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -631,7 +624,7 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(put2);
     memstore.add(put3);
 
-    assertEquals(3, memstore.cellSet.size());
+    assertEquals(3, memstore.getActive().getCellsCount());
 
     KeyValue del2 =
       new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@@ -644,9 +637,9 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put1);
 
 
-    assertEquals(4, memstore.cellSet.size());
+    assertEquals(4, memstore.getActive().getCellsCount());
     int i = 0;
-    for (Cell cell: memstore.cellSet) {
+    for (Cell cell: memstore.getActive().getCellSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -684,9 +677,9 @@ public class TestDefaultMemStore extends TestCase {
 
 
 
-    assertEquals(5, memstore.cellSet.size());
+    assertEquals(5, memstore.getActive().getCellsCount());
     int i = 0;
-    for (Cell cell: memstore.cellSet) {
+    for (Cell cell: memstore.getActive().getCellSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -700,8 +693,8 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(new KeyValue(row, fam, qf, ts, val));
     KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
     memstore.delete(delete);
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.getActive().getCellsCount());
+    assertEquals(delete, memstore.getActive().first());
   }
 
   public void testRetainsDeleteVersion() throws IOException {
@@ -713,8 +706,8 @@ public class TestDefaultMemStore extends TestCase {
         "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.getActive().getCellsCount());
+    assertEquals(delete, memstore.getActive().first());
   }
   public void testRetainsDeleteColumn() throws IOException {
     // add a put to memstore
@@ -725,8 +718,8 @@ public class TestDefaultMemStore extends TestCase {
         KeyValue.Type.DeleteColumn, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.getActive().getCellsCount());
+    assertEquals(delete, memstore.getActive().first());
   }
   public void testRetainsDeleteFamily() throws IOException {
     // add a put to memstore
@@ -737,43 +730,8 @@ public class TestDefaultMemStore extends TestCase {
         KeyValue.Type.DeleteFamily, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
-  }
-
-  ////////////////////////////////////
-  //Test for timestamps
-  ////////////////////////////////////
-
-  /**
-   * Test to ensure correctness when using Memstore with multiple timestamps
-   */
-  public void testMultipleTimestamps() throws Exception {
-    long[] timestamps = new long[] {20,10,5,1};
-    Scan scan = new Scan();
-
-    for (long timestamp: timestamps)
-      addRows(memstore,timestamp);
-
-    byte[] fam = Bytes.toBytes("fam");
-    HColumnDescriptor hcd = mock(HColumnDescriptor.class);
-    when(hcd.getName()).thenReturn(fam);
-    Store store = mock(Store.class);
-    when(store.getFamily()).thenReturn(hcd);
-    scan.setColumnFamilyTimeRange(fam, 0, 2);
-    assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
-
-    scan.setColumnFamilyTimeRange(fam, 20, 82);
-    assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
-
-    scan.setColumnFamilyTimeRange(fam, 10, 20);
-    assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
-
-    scan.setColumnFamilyTimeRange(fam, 8, 12);
-    assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE));
-
-    scan.setColumnFamilyTimeRange(fam, 28, 42);
-    assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE));
+    assertEquals(2, memstore.getActive().getCellsCount());
+    assertEquals(delete, memstore.getActive().first());
   }
 
   ////////////////////////////////////
@@ -795,7 +753,7 @@ public class TestDefaultMemStore extends TestCase {
    */
   public void testUpsertMSLAB() throws Exception {
     Configuration conf = HBaseConfiguration.create();
-    conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true);
+    conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
     memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
 
     int ROW_SIZE = 2048;
@@ -838,7 +796,7 @@ public class TestDefaultMemStore extends TestCase {
   public void testUpsertMemstoreSize() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
-    long oldSize = memstore.size.get();
+    long oldSize = memstore.size();
 
     List<Cell> l = new ArrayList<Cell>();
     KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -849,18 +807,18 @@ public class TestDefaultMemStore extends TestCase {
     l.add(kv1); l.add(kv2); l.add(kv3);
 
     this.memstore.upsert(l, 2);// readpoint is 2
-    long newSize = this.memstore.size.get();
+    long newSize = this.memstore.size();
     assert(newSize > oldSize);
     //The kv1 should be removed.
-    assert(memstore.cellSet.size() == 2);
+    assert(memstore.getActive().getCellsCount() == 2);
 
     KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
     kv4.setSequenceId(1);
     l.clear(); l.add(kv4);
     this.memstore.upsert(l, 3);
-    assertEquals(newSize, this.memstore.size.get());
+    assertEquals(newSize, this.memstore.size());
     //The kv2 should be removed.
-    assert(memstore.cellSet.size() == 2);
+    assert(memstore.getActive().getCellsCount() == 2);
     //this.memstore = null;
   }
 
@@ -1021,10 +979,11 @@ public class TestDefaultMemStore extends TestCase {
 
   private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
     // Save off old state.
-    int oldHistorySize = hmc.snapshot.size();
+    int oldHistorySize = hmc.getSnapshot().getCellsCount();
     MemStoreSnapshot snapshot = hmc.snapshot();
     // Make some assertions about what just happened.
-    assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size());
+    assertTrue("History size has not increased", oldHistorySize < hmc.getSnapshot().getCellsCount
+        ());
     long t = memstore.timeOfOldestEdit();
     assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
     hmc.clearSnapshot(snapshot.getId());

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 385048c..b237490 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -18,20 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.security.Key;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-import javax.crypto.spec.SecretKeySpec;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -74,6 +60,19 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.security.Key;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 @Category(MediumTests.class)
 public class TestHMobStore {
   public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
@@ -468,7 +467,7 @@ public class TestHMobStore {
     this.store.snapshot();
     flushStore(store, id++);
     Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size());
-    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 7add8a9..a5574d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -18,54 +18,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-
-import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -176,10 +132,52 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Basic stand-alone testing of HRegion.  No clusters!
@@ -302,8 +300,6 @@ public class TestHRegion {
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
 
-
-
   /*
    * This test is for verifying memstore snapshot size is correctly updated in case of rollback
    * See HBASE-10845
@@ -332,7 +328,7 @@ public class TestHRegion {
     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
     MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
-        CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
+      CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
 
     Store store = region.getStore(COLUMN_FAMILY_BYTES);
     // Get some random bytes.
@@ -1289,7 +1285,8 @@ public class TestHRegion {
     private final AtomicInteger count;
     private Exception e;
 
-    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) {
+    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
+        final AtomicInteger c) {
       super("getter." + i);
       this.g = new Get(r);
       this.done = d;
@@ -2452,10 +2449,10 @@ public class TestHRegion {
       // This is kinda hacky, but better than nothing...
       long now = System.currentTimeMillis();
       DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
-      Cell firstCell = memstore.cellSet.first();
+      Cell firstCell = memstore.getActive().first();
       assertTrue(firstCell.getTimestamp() <= now);
       now = firstCell.getTimestamp();
-      for (Cell cell : memstore.cellSet) {
+      for (Cell cell : memstore.getActive().getCellSet()) {
         assertTrue(cell.getTimestamp() <= now);
         now = cell.getTimestamp();
       }
@@ -2782,7 +2779,8 @@ public class TestHRegion {
       } catch (NotServingRegionException e) {
         // this is the correct exception that is expected
       } catch (IOException e) {
-        fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: "
+        fail("Got wrong type of exception - should be a NotServingRegionException, " +
+            "but was an IOException: "
             + e.getMessage());
       }
     } finally {
@@ -2980,7 +2978,8 @@ public class TestHRegion {
   }
 
   @Test
-  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException {
+  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
+      IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
     byte[][] families = { fam1 };
@@ -4978,7 +4977,8 @@ public class TestHRegion {
       // move the file of the primary region to the archive, simulating a compaction
       Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
       primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
-      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]);
+      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
+          .getStoreFiles(families[0]);
       Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
 
       verifyData(secondaryRegion, 0, 1000, cq, families);
@@ -4992,7 +4992,8 @@ public class TestHRegion {
     }
   }
 
-  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
+  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
+      IOException {
     putData(this.region, startRow, numRows, qf, families);
   }
 
@@ -5085,7 +5086,6 @@ public class TestHRegion {
 
   /**
    * Test that we get the expected flush results back
-   * @throws IOException
    */
   @Test
   public void testFlushResult() throws IOException {
@@ -5138,11 +5138,6 @@ public class TestHRegion {
   }
 
   /**
-   * @param tableName
-   * @param callingMethod
-   * @param conf
-   * @param families
-   * @throws IOException
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
@@ -5152,12 +5147,6 @@ public class TestHRegion {
   }
 
   /**
-   * @param tableName
-   * @param callingMethod
-   * @param conf
-   * @param isReadOnly
-   * @param families
-   * @throws IOException
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
@@ -5177,14 +5166,6 @@ public class TestHRegion {
   }
 
   /**
-   * @param tableName
-   * @param startKey
-   * @param stopKey
-   * @param callingMethod
-   * @param conf
-   * @param isReadOnly
-   * @param families
-   * @throws IOException
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
@@ -5676,7 +5657,8 @@ public class TestHRegion {
       currRow.clear();
       hasNext = scanner.next(currRow);
       assertEquals(2, currRow.size());
-      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0,
+      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
+          currRow.get(0).getRowLength(), row4, 0,
         row4.length));
       assertTrue(hasNext);
       // 2. scan out "row3" (2 kv)
@@ -6088,7 +6070,7 @@ public class TestHRegion {
   public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
     // similar to the above test but with distributed log replay
     final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
-      100, 42);
+        100, 42);
     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
 
     HTableDescriptor htd

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 80333e8..b5e9798 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.Random;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -36,6 +30,13 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test the {@link MemStoreChunkPool} class
  */
@@ -47,7 +48,7 @@ public class TestMemStoreChunkPool {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true);
+    conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
     conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
     chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
     MemStoreChunkPool.chunkPoolDisabled = false;
@@ -116,13 +117,13 @@ public class TestMemStoreChunkPool {
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
 
     // Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.getActive().getCellsCount());
     memstore.add(new KeyValue(row, fam, qf4, val));
     memstore.add(new KeyValue(row, fam, qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.getActive().getCellsCount());
     memstore.clearSnapshot(snapshot.getId());
 
     int chunkCount = chunkPool.getPoolSize();
@@ -132,7 +133,7 @@ public class TestMemStoreChunkPool {
 
   @Test
   public void testPuttingBackChunksWithOpeningScanner()
-      throws UnexpectedStateException {
+      throws IOException {
     byte[] row = Bytes.toBytes("testrow");
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf1 = Bytes.toBytes("testqualifier1");
@@ -153,13 +154,13 @@ public class TestMemStoreChunkPool {
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
 
     // Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.getActive().getCellsCount());
     memstore.add(new KeyValue(row, fam, qf4, val));
     memstore.add(new KeyValue(row, fam, qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.getActive().getCellsCount());
 
     // opening scanner before clear the snapshot
     List<KeyValueScanner> scanners = memstore.getScanners(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 354ea2d..0a67ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
@@ -92,8 +93,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
-
 /**
  * Test class for the Store
  */
@@ -555,7 +554,7 @@ public class TestStore {
     this.store.snapshot();
     flushStore(store, id++);
     Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
-    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
   }
 
   private void assertCheck() {
@@ -600,7 +599,7 @@ public class TestStore {
     flushStore(store, id++);
     Assert.assertEquals(1, this.store.getStorefiles().size());
     // from the one we inserted up there, and a new one
-    Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(2, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
 
     // how many key/values for this row are there?
     Get get = new Get(row);
@@ -669,7 +668,7 @@ public class TestStore {
     }
 
     long computedSize=0;
-    for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
+    for (Cell cell : ((AbstractMemStore)this.store.memstore).getActive().getCellSet()) {
       long kvsize = DefaultMemStore.heapSizeChange(cell, true);
       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
       computedSize += kvsize;
@@ -701,7 +700,7 @@ public class TestStore {
     // then flush.
     flushStore(store, id++);
     Assert.assertEquals(1, this.store.getStorefiles().size());
-    Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(1, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount());
 
     // now increment again:
     newValue += 1;


[2/2] hbase git commit: HBASE-14919 Refactoring for in-memory flush and compaction

Posted by st...@apache.org.
HBASE-14919 Refactoring for in-memory flush and compaction

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25dfc112
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25dfc112
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25dfc112

Branch: refs/heads/master
Commit: 25dfc112dd76134a9a3ce1f2e88c4075aef76557
Parents: a975408
Author: eshcar <es...@yahoo-inc.com>
Authored: Mon Feb 8 23:35:02 2016 +0200
Committer: stack <st...@apache.org>
Committed: Thu Feb 11 10:39:01 2016 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    | 497 +++++++++++
 .../hadoop/hbase/regionserver/CellSet.java      | 183 ++++
 .../hbase/regionserver/CellSkipListSet.java     | 185 ----
 .../hbase/regionserver/DefaultMemStore.java     | 859 +------------------
 .../hadoop/hbase/regionserver/HStore.java       |  22 +-
 .../hbase/regionserver/ImmutableSegment.java    |  72 ++
 .../regionserver/ImmutableSegmentAdapter.java   | 107 +++
 .../hadoop/hbase/regionserver/MemStore.java     |  16 +-
 .../hbase/regionserver/MemStoreScanner.java     | 348 ++++++++
 .../hbase/regionserver/MemStoreSnapshot.java    |  13 +-
 .../regionserver/MutableCellSetSegment.java     | 153 ++++
 .../MutableCellSetSegmentScanner.java           | 258 ++++++
 .../hbase/regionserver/MutableSegment.java      |  57 ++
 .../hadoop/hbase/regionserver/Segment.java      | 218 +++++
 .../hbase/regionserver/SegmentFactory.java      |  89 ++
 .../hbase/regionserver/SegmentScanner.java      | 152 ++++
 .../hbase/regionserver/StoreFlushContext.java   |   2 +-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  49 +-
 .../hbase/regionserver/TestCellSkipListSet.java |  13 +-
 .../hbase/regionserver/TestDefaultMemStore.java | 133 +--
 .../hbase/regionserver/TestHMobStore.java       |  29 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 150 ++--
 .../regionserver/TestMemStoreChunkPool.java     |  29 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  11 +-
 24 files changed, 2380 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
new file mode 100644
index 0000000..18d2f8a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -0,0 +1,497 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * An abstract class, which implements the behaviour shared by all concrete memstore instances.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMemStore implements MemStore {
+
+  private static final long NO_SNAPSHOT_ID = -1;
+
+  private final Configuration conf;
+  private final CellComparator comparator;
+
+  // active segment absorbs write operations
+  private volatile MutableSegment active;
+  // Snapshot of memstore.  Made for flusher.
+  private volatile ImmutableSegment snapshot;
+  protected volatile long snapshotId;
+  // Used to track when to flush
+  private volatile long timeOfOldestEdit;
+
+  public final static long FIXED_OVERHEAD = ClassSize.align(
+      ClassSize.OBJECT +
+          (4 * ClassSize.REFERENCE) +
+          (2 * Bytes.SIZEOF_LONG));
+
+  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+      2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
+      ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
+
+
+  protected AbstractMemStore(final Configuration conf, final CellComparator c) {
+    this.conf = conf;
+    this.comparator = c;
+    resetCellSet();
+    this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0);
+    this.snapshotId = NO_SNAPSHOT_ID;
+  }
+
+  protected void resetCellSet() {
+    // Reset heap to not include any keys
+    this.active = SegmentFactory.instance().createMutableSegment(
+        conf, comparator, DEEP_OVERHEAD);
+    this.timeOfOldestEdit = Long.MAX_VALUE;
+  }
+
+  /*
+  * Calculate how the MemStore size has changed.  Includes overhead of the
+  * backing Map.
+  * @param cell
+  * @param notPresent True if the cell was NOT present in the set.
+  * @return change in size
+  */
+  static long heapSizeChange(final Cell cell, final boolean notPresent) {
+    return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
+        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+  }
+
+  /**
+   * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
+   * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
+   *                      only if it is greater than the previous sequence id
+   */
+  public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent);
+
+  /**
+   * Write an update
+   * @param cell the cell to be added
+   * @return approximate size of the passed cell & newly added cell which maybe different than the
+   *         passed-in cell
+   */
+  @Override
+  public long add(Cell cell) {
+    Cell toAdd = maybeCloneWithAllocator(cell);
+    return internalAdd(toAdd);
+  }
+
+  /**
+   * Update or insert the specified Cells.
+   * <p>
+   * For each Cell, insert into MemStore.  This will atomically upsert the
+   * value for that row/family/qualifier.  If a Cell did already exist,
+   * it will then be removed.
+   * <p>
+   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+   * be immediately visible.  May want to change this so it is atomic across
+   * all Cells.
+   * <p>
+   * This is called under row lock, so Get operations will still see updates
+   * atomically.  Scans will only see each Cell update as atomic.
+   *
+   * @param cells the cells to be updated
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @return change in memstore size
+   */
+  @Override
+  public long upsert(Iterable<Cell> cells, long readpoint) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += upsert(cell, readpoint);
+    }
+    return size;
+  }
+
+  /**
+   * @return Oldest timestamp of all the Cells in the MemStore
+   */
+  @Override
+  public long timeOfOldestEdit() {
+    return timeOfOldestEdit;
+  }
+
+
+  /**
+   * Write a delete
+   * @param deleteCell the cell to be deleted
+   * @return approximate size of the passed key and value.
+   */
+  @Override
+  public long delete(Cell deleteCell) {
+    Cell toAdd = maybeCloneWithAllocator(deleteCell);
+    long s = internalAdd(toAdd);
+    return s;
+  }
+
+  /**
+   * An override on snapshot so the no arg version of the method implies zero seq num,
+   * like for cases without wal
+   */
+  public MemStoreSnapshot snapshot() {
+    return snapshot(0);
+  }
+
+  /**
+   * The passed snapshot was successfully persisted; it can be let go.
+   * @param id Id of the snapshot to clean out.
+   * @see MemStore#snapshot(long)
+   */
+  @Override
+  public void clearSnapshot(long id) throws UnexpectedStateException {
+    if (this.snapshotId != id) {
+      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+          + id);
+    }
+    // OK. Passed in snapshot is same as current snapshot. If not-empty,
+    // create a new snapshot and let the old one go.
+    Segment oldSnapshot = this.snapshot;
+    if (!this.snapshot.isEmpty()) {
+      this.snapshot = SegmentFactory.instance().createImmutableSegment(
+          getComparator(), 0);
+    }
+    this.snapshotId = NO_SNAPSHOT_ID;
+    oldSnapshot.close();
+  }
+
+  /**
+   * Get the entire heap usage for this MemStore not including keys in the
+   * snapshot.
+   */
+  @Override
+  public long heapSize() {
+    return getActive().getSize();
+  }
+
+  /**
+   * On flush, how much memory we will clear from the active cell set.
+   *
+   * @return size of data that is going to be flushed from active set
+   */
+  @Override
+  public long getFlushableSize() {
+    long snapshotSize = getSnapshot().getSize();
+    return snapshotSize > 0 ? snapshotSize : keySize();
+  }
+
+
+  /**
+   * @return a list containing a single memstore scanner.
+   */
+  @Override
+  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
+    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt));
+  }
+
+  @Override
+  public long getSnapshotSize() {
+    return getSnapshot().getSize();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    int i = 1;
+    try {
+      for (Segment segment : getListOfSegments()) {
+        buf.append("Segment (" + i + ") " + segment.toString() + "; ");
+        i++;
+      }
+    } catch (IOException e){
+      return e.toString();
+    }
+    return buf.toString();
+  }
+
+  protected void rollbackInSnapshot(Cell cell) {
+    // If the key is in the snapshot, delete it. We should not update
+    // this.size, because that tracks the size of only the memstore and
+    // not the snapshot. The flush of this snapshot to disk has not
+    // yet started because Store.flush() waits for all rwcc transactions to
+    // commit before starting the flush to disk.
+    snapshot.rollback(cell);
+  }
+
+  protected void rollbackInActive(Cell cell) {
+    // If the key is in the memstore, delete it. Update this.size.
+    long sz = active.rollback(cell);
+    if (sz != 0) {
+      setOldestEditTimeToNow();
+    }
+  }
+
+  protected Configuration getConfiguration() {
+    return conf;
+  }
+
+  protected void dump(Log log) {
+    active.dump(log);
+    snapshot.dump(log);
+  }
+
+
+  /**
+   * Inserts the specified Cell into MemStore and deletes any existing
+   * versions of the same row/family/qualifier as the specified Cell.
+   * <p>
+   * First, the specified Cell is inserted into the Memstore.
+   * <p>
+   * If there are any existing Cell in this MemStore with the same row,
+   * family, and qualifier, they are removed.
+   * <p>
+   * Callers must hold the read lock.
+   *
+   * @param cell the cell to be updated
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @return change in size of MemStore
+   */
+  private long upsert(Cell cell, long readpoint) {
+    // Add the Cell to the MemStore
+    // Use the internalAdd method here since we (a) already have a lock
+    // and (b) cannot safely use the MSLAB here without potentially
+    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+    // test that triggers the pathological case if we don't avoid MSLAB
+    // here.
+    long addedSize = internalAdd(cell);
+
+    // Get the Cells for the row/family/qualifier regardless of timestamp.
+    // For this case we want to clean up any other puts
+    Cell firstCell = KeyValueUtil.createFirstOnRow(
+        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+    SortedSet<Cell> ss = active.tailSet(firstCell);
+    Iterator<Cell> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
+    while (it.hasNext()) {
+      Cell cur = it.next();
+
+      if (cell == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // check that this is the row and column we are interested in, otherwise bail
+      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+            cur.getSequenceId() <= readpoint) {
+          if (versionsVisible >= 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            long delta = heapSizeChange(cur, true);
+            addedSize -= delta;
+            active.incSize(-delta);
+            it.remove();
+            setOldestEditTimeToNow();
+          } else {
+            versionsVisible++;
+          }
+        }
+      } else {
+        // past the row or column, done
+        break;
+      }
+    }
+    return addedSize;
+  }
+
+  /*
+   * @param a
+   * @param b
+   * @return Return lowest of a or b or null if both a and b are null
+   */
+  protected Cell getLowest(final Cell a, final Cell b) {
+    if (a == null) {
+      return b;
+    }
+    if (b == null) {
+      return a;
+    }
+    return comparator.compareRows(a, b) <= 0? a: b;
+  }
+
+  /*
+   * @param key Find row that follows this one.  If null, return first.
+   * @param set Set to look in for a row beyond <code>row</code>.
+   * @return Next row or null if none found.  If one found, will be a new
+   * KeyValue -- can be destroyed by subsequent calls to this method.
+   */
+  protected Cell getNextRow(final Cell key,
+      final NavigableSet<Cell> set) {
+    Cell result = null;
+    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
+    // Iterate until we fall into the next row; i.e. move off current row
+    for (Cell cell: tail) {
+      if (comparator.compareRows(cell, key) <= 0) {
+        continue;
+      }
+      // Note: Not suppressing deletes or expired cells.  Needs to be handled
+      // by higher up functions.
+      result = cell;
+      break;
+    }
+    return result;
+  }
+
+  /**
+   * Given the specs of a column, update it, first by inserting a new record,
+   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
+   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+   * get the new value, or the old value and all readers will eventually only see the new
+   * value after the old was removed.
+   */
+  @VisibleForTesting
+  @Override
+  public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long newValue, long now) {
+    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
+    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+    Cell snc = snapshot.getFirstAfter(firstCell);
+    if(snc != null) {
+      // is there a matching Cell in the snapshot?
+      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
+        if (snc.getTimestamp() == now) {
+          now += 1;
+        }
+      }
+    }
+    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+    // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+    // so we cant add the new Cell w/o knowing what's there already, but we also
+    // want to take this chance to delete some cells. So two loops (sad)
+
+    SortedSet<Cell> ss = getActive().tailSet(firstCell);
+    for (Cell cell : ss) {
+      // if this isnt the row we are interested in, then bail:
+      if (!CellUtil.matchingColumn(cell, family, qualifier)
+          || !CellUtil.matchingRow(cell, firstCell)) {
+        break; // rows dont match, bail.
+      }
+
+      // if the qualifier matches and it's a put, just RM it out of the active.
+      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
+          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
+        now = cell.getTimestamp();
+      }
+    }
+
+    // create or update (upsert) a new Cell with
+    // 'now' and a 0 memstoreTS == immediately visible
+    List<Cell> cells = new ArrayList<Cell>(1);
+    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+    return upsert(cells, 1L);
+  }
+
+  private Cell maybeCloneWithAllocator(Cell cell) {
+    return active.maybeCloneWithAllocator(cell);
+  }
+
+  /**
+   * Internal version of add() that doesn't clone Cells with the
+   * allocator, and doesn't take the lock.
+   *
+   * Callers should ensure they already have the read lock taken
+   */
+  private long internalAdd(final Cell toAdd) {
+    long s = active.add(toAdd);
+    setOldestEditTimeToNow();
+    checkActiveSize();
+    return s;
+  }
+
+  private void setOldestEditTimeToNow() {
+    if (timeOfOldestEdit == Long.MAX_VALUE) {
+      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
+    }
+  }
+
+  protected long keySize() {
+    return heapSize() - DEEP_OVERHEAD;
+  }
+
+  protected CellComparator getComparator() {
+    return comparator;
+  }
+
+  protected MutableSegment getActive() {
+    return active;
+  }
+
+  protected ImmutableSegment getSnapshot() {
+    return snapshot;
+  }
+
+  protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
+    this.snapshot = snapshot;
+    return this;
+  }
+
+  protected void setSnapshotSize(long snapshotSize) {
+    getSnapshot().setSize(snapshotSize);
+  }
+
+  /**
+   * Check whether anything need to be done based on the current active set size
+   */
+  protected abstract void checkActiveSize();
+
+  /**
+   * Returns a list of Store segment scanners, one per each store segment
+   * @param readPt the version number required to initialize the scanners
+   * @return a list of Store segment scanners, one per each store segment
+   */
+  protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException;
+
+  /**
+   * Returns an ordered list of segments from most recent to oldest in memstore
+   * @return an ordered list of segments from most recent to oldest in memstore
+   */
+  protected abstract List<Segment> getListOfSegments() throws IOException;
+
+  public long getActiveSize() {
+    return getActive().getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
new file mode 100644
index 0000000..4433302
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already
+ * exists in the set.  The call to add returns true if no value in the backing map or false if
+ * there was an entry with same key (though value may be different).
+ * implementation is tolerant of concurrent get and set and won't throw
+ * ConcurrentModificationException when iterating.
+ */
+@InterfaceAudience.Private
+public class CellSet implements NavigableSet<Cell>  {
+  // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
+  // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
+  // is not already present.", this implementation "Adds the specified element to this set EVEN
+  // if it is already present overwriting what was there previous".
+  // Otherwise, has same attributes as ConcurrentSkipListSet
+  private final ConcurrentNavigableMap<Cell, Cell> delegatee;
+
+  CellSet(final CellComparator c) {
+    this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
+  }
+
+  CellSet(final ConcurrentNavigableMap<Cell, Cell> m) {
+    this.delegatee = m;
+  }
+
+  public Cell ceiling(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Iterator<Cell> descendingIterator() {
+    return this.delegatee.descendingMap().values().iterator();
+  }
+
+  public NavigableSet<Cell> descendingSet() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell floor(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> headSet(final Cell toElement) {
+    return headSet(toElement, false);
+  }
+
+  public NavigableSet<Cell> headSet(final Cell toElement,
+      boolean inclusive) {
+    return new CellSet(this.delegatee.headMap(toElement, inclusive));
+  }
+
+  public Cell higher(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Iterator<Cell> iterator() {
+    return this.delegatee.values().iterator();
+  }
+
+  public Cell lower(Cell e) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell pollFirst() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell pollLast() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public NavigableSet<Cell> subSet(Cell fromElement,
+      boolean fromInclusive, Cell toElement, boolean toInclusive) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public SortedSet<Cell> tailSet(Cell fromElement) {
+    return tailSet(fromElement, true);
+  }
+
+  public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
+    return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
+  }
+
+  public Comparator<? super Cell> comparator() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell first() {
+    return this.delegatee.get(this.delegatee.firstKey());
+  }
+
+  public Cell last() {
+    return this.delegatee.get(this.delegatee.lastKey());
+  }
+
+  public boolean add(Cell e) {
+    return this.delegatee.put(e, e) == null;
+  }
+
+  public boolean addAll(Collection<? extends Cell> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public void clear() {
+    this.delegatee.clear();
+  }
+
+  public boolean contains(Object o) {
+    //noinspection SuspiciousMethodCalls
+    return this.delegatee.containsKey(o);
+  }
+
+  public boolean containsAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public boolean isEmpty() {
+    return this.delegatee.isEmpty();
+  }
+
+  public boolean remove(Object o) {
+    return this.delegatee.remove(o) != null;
+  }
+
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public Cell get(Cell kv) {
+    return this.delegatee.get(kv);
+  }
+
+  public int size() {
+    return this.delegatee.size();
+  }
+
+  public Object[] toArray() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  public <T> T[] toArray(T[] a) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
deleted file mode 100644
index e9941b3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A {@link java.util.Set} of {@link Cell}s implemented on top of a
- * {@link java.util.concurrent.ConcurrentSkipListMap}.  Works like a
- * {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard:
- * An add will overwrite if already an entry for the added key.  In other words,
- * where CSLS does "Adds the specified element to this set if it is not already
- * present.", this implementation "Adds the specified element to this set EVEN
- * if it is already present overwriting what was there previous".  The call to
- * add returns true if no value in the backing map or false if there was an
- * entry with same key (though value may be different).
- * <p>Otherwise,
- * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
- * get and set and won't throw ConcurrentModificationException when iterating.
- */
-@InterfaceAudience.Private
-public class CellSkipListSet implements NavigableSet<Cell> {
-  private final ConcurrentNavigableMap<Cell, Cell> delegatee;
-
-  CellSkipListSet(final CellComparator c) {
-    this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
-  }
-
-  CellSkipListSet(final ConcurrentNavigableMap<Cell, Cell> m) {
-    this.delegatee = m;
-  }
-
-  public Cell ceiling(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Iterator<Cell> descendingIterator() {
-    return this.delegatee.descendingMap().values().iterator();
-  }
-
-  public NavigableSet<Cell> descendingSet() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell floor(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> headSet(final Cell toElement) {
-    return headSet(toElement, false);
-  }
-
-  public NavigableSet<Cell> headSet(final Cell toElement,
-      boolean inclusive) {
-    return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive));
-  }
-
-  public Cell higher(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Iterator<Cell> iterator() {
-    return this.delegatee.values().iterator();
-  }
-
-  public Cell lower(Cell e) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell pollFirst() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell pollLast() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public NavigableSet<Cell> subSet(Cell fromElement,
-      boolean fromInclusive, Cell toElement, boolean toInclusive) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public SortedSet<Cell> tailSet(Cell fromElement) {
-    return tailSet(fromElement, true);
-  }
-
-  public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
-    return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive));
-  }
-
-  public Comparator<? super Cell> comparator() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell first() {
-    return this.delegatee.get(this.delegatee.firstKey());
-  }
-
-  public Cell last() {
-    return this.delegatee.get(this.delegatee.lastKey());
-  }
-
-  public boolean add(Cell e) {
-    return this.delegatee.put(e, e) == null;
-  }
-
-  public boolean addAll(Collection<? extends Cell> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public void clear() {
-    this.delegatee.clear();
-  }
-
-  public boolean contains(Object o) {
-    //noinspection SuspiciousMethodCalls
-    return this.delegatee.containsKey(o);
-  }
-
-  public boolean containsAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public boolean isEmpty() {
-    return this.delegatee.isEmpty();
-  }
-
-  public boolean remove(Object o) {
-    return this.delegatee.remove(o) != null;
-  }
-
-  public boolean removeAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public boolean retainAll(Collection<?> c) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public Cell get(Cell kv) {
-    return this.delegatee.get(kv);
-  }
-
-  public int size() {
-    return this.delegatee.size();
-  }
-
-  public Object[] toArray() {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  public <T> T[] toArray(T[] a) {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index f61d871..82d40b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -19,35 +19,22 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.htrace.Trace;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -66,40 +53,8 @@ import org.apache.htrace.Trace;
  * in KV size.
  */
 @InterfaceAudience.Private
-public class DefaultMemStore implements MemStore {
+public class DefaultMemStore extends AbstractMemStore {
   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
-  static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
-  private static final boolean USEMSLAB_DEFAULT = true;
-  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
-
-  private Configuration conf;
-
-  // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
-  // better semantics.  The Map will overwrite if passed a key it already had
-  // whereas the Set will not add new Cell if key is same though value might be
-  // different.  Value is not important -- just make sure always same
-  // reference passed.
-  volatile CellSkipListSet cellSet;
-
-  // Snapshot of memstore.  Made for flusher.
-  volatile CellSkipListSet snapshot;
-
-  final CellComparator comparator;
-
-  // Used to track own heapSize
-  final AtomicLong size;
-  private volatile long snapshotSize;
-
-  // Used to track when to flush
-  volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
-  TimeRangeTracker timeRangeTracker;
-  TimeRangeTracker snapshotTimeRangeTracker;
-
-  volatile MemStoreLAB allocator;
-  volatile MemStoreLAB snapshotAllocator;
-  volatile long snapshotId;
-  volatile boolean tagsPresent;
 
   /**
    * Default constructor. Used for tests.
@@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore {
    * Constructor.
    * @param c Comparator
    */
-  public DefaultMemStore(final Configuration conf,
-                  final CellComparator c) {
-    this.conf = conf;
-    this.comparator = c;
-    this.cellSet = new CellSkipListSet(c);
-    this.snapshot = new CellSkipListSet(c);
-    timeRangeTracker = new TimeRangeTracker();
-    snapshotTimeRangeTracker = new TimeRangeTracker();
-    this.size = new AtomicLong(DEEP_OVERHEAD);
-    this.snapshotSize = 0;
-    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-      this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-          new Class[] { Configuration.class }, new Object[] { conf });
-    } else {
-      this.allocator = null;
-    }
+  public DefaultMemStore(final Configuration conf, final CellComparator c) {
+    super(conf, c);
   }
 
   void dump() {
-    for (Cell cell: this.cellSet) {
-      LOG.info(cell);
-    }
-    for (Cell cell: this.snapshot) {
-      LOG.info(cell);
-    }
+    super.dump(LOG);
   }
 
   /**
    * Creates a snapshot of the current memstore.
    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
+   * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal
    */
   @Override
-  public MemStoreSnapshot snapshot() {
+  public MemStoreSnapshot snapshot(long flushOpSeqId) {
     // If snapshot currently has entries, then flusher failed or didn't call
     // cleanup.  Log a warning.
-    if (!this.snapshot.isEmpty()) {
+    if (!getSnapshot().isEmpty()) {
       LOG.warn("Snapshot called again without clearing previous. " +
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
-      this.snapshotSize = keySize();
-      if (!this.cellSet.isEmpty()) {
-        this.snapshot = this.cellSet;
-        this.cellSet = new CellSkipListSet(this.comparator);
-        this.snapshotTimeRangeTracker = this.timeRangeTracker;
-        this.timeRangeTracker = new TimeRangeTracker();
-        // Reset heap to not include any keys
-        this.size.set(DEEP_OVERHEAD);
-        this.snapshotAllocator = this.allocator;
-        // Reset allocator so we get a fresh buffer for the new memstore
-        if (allocator != null) {
-          String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-          this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-              new Class[] { Configuration.class }, new Object[] { conf });
-        }
-        timeOfOldestEdit = Long.MAX_VALUE;
+      if (!getActive().isEmpty()) {
+        ImmutableSegment immutableSegment = SegmentFactory.instance().
+            createImmutableSegment(getConfiguration(), getActive());
+        setSnapshot(immutableSegment);
+        setSnapshotSize(keySize());
+        resetCellSet();
       }
     }
-    MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
-        this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
-        this.tagsPresent);
-    this.tagsPresent = false;
-    return memStoreSnapshot;
-  }
-
-  /**
-   * The passed snapshot was successfully persisted; it can be let go.
-   * @param id Id of the snapshot to clean out.
-   * @throws UnexpectedStateException
-   * @see #snapshot()
-   */
-  @Override
-  public void clearSnapshot(long id) throws UnexpectedStateException {
-    MemStoreLAB tmpAllocator = null;
-    if (this.snapshotId != id) {
-      throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
-          + id);
-    }
-    // OK. Passed in snapshot is same as current snapshot. If not-empty,
-    // create a new snapshot and let the old one go.
-    if (!this.snapshot.isEmpty()) {
-      this.snapshot = new CellSkipListSet(this.comparator);
-      this.snapshotTimeRangeTracker = new TimeRangeTracker();
-    }
-    this.snapshotSize = 0;
-    this.snapshotId = -1;
-    if (this.snapshotAllocator != null) {
-      tmpAllocator = this.snapshotAllocator;
-      this.snapshotAllocator = null;
-    }
-    if (tmpAllocator != null) {
-      tmpAllocator.close();
-    }
-  }
-
-  @Override
-  public long getFlushableSize() {
-    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
-  }
+    return new MemStoreSnapshot(this.snapshotId, getSnapshot());
 
-  @Override
-  public long getSnapshotSize() {
-    return this.snapshotSize;
   }
 
-  /**
-   * Write an update
-   * @param cell
-   * @return approximate size of the passed Cell.
-   */
   @Override
-  public long add(Cell cell) {
-    Cell toAdd = maybeCloneWithAllocator(cell);
-    return internalAdd(toAdd);
+  protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
+    List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
+    list.add(0, getActive().getSegmentScanner(readPt));
+    list.add(1, getSnapshot().getSegmentScanner(readPt));
+    return list;
   }
 
   @Override
-  public long timeOfOldestEdit() {
-    return timeOfOldestEdit;
-  }
-
-  private boolean addToCellSet(Cell e) {
-    boolean b = this.cellSet.add(e);
-    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
-    // When we use ACL CP or Visibility CP which deals with Tags during
-    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
-    // parse the byte[] to identify the tags length.
-    if(e.getTagsLength() > 0) {
-      tagsPresent = true;
-    }
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  private boolean removeFromCellSet(Cell e) {
-    boolean b = this.cellSet.remove(e);
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  void setOldestEditTimeToNow() {
-    if (timeOfOldestEdit == Long.MAX_VALUE) {
-      timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
-    }
-  }
-
-  /**
-   * Internal version of add() that doesn't clone Cells with the
-   * allocator, and doesn't take the lock.
-   *
-   * Callers should ensure they already have the read lock taken
-   */
-  private long internalAdd(final Cell toAdd) {
-    long s = heapSizeChange(toAdd, addToCellSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
-  }
-
-  private Cell maybeCloneWithAllocator(Cell cell) {
-    if (allocator == null) {
-      return cell;
-    }
-
-    int len = KeyValueUtil.length(cell);
-    ByteRange alloc = allocator.allocateBytes(len);
-    if (alloc == null) {
-      // The allocation was too large, allocator decided
-      // not to do anything with it.
-      return cell;
-    }
-    assert alloc.getBytes() != null;
-    KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
-    KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
-    newKv.setSequenceId(cell.getSequenceId());
-    return newKv;
+  protected List<Segment> getListOfSegments() throws IOException {
+    List<Segment> list = new ArrayList<Segment>(2);
+    list.add(0, getActive());
+    list.add(1, getSnapshot());
+    return list;
   }
 
   /**
@@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public void rollback(Cell cell) {
-    // If the key is in the snapshot, delete it. We should not update
-    // this.size, because that tracks the size of only the memstore and
-    // not the snapshot. The flush of this snapshot to disk has not
-    // yet started because Store.flush() waits for all rwcc transactions to
-    // commit before starting the flush to disk.
-    Cell found = this.snapshot.get(cell);
-    if (found != null && found.getSequenceId() == cell.getSequenceId()) {
-      this.snapshot.remove(cell);
-      long sz = heapSizeChange(cell, true);
-      this.snapshotSize -= sz;
-    }
-    // If the key is in the memstore, delete it. Update this.size.
-    found = this.cellSet.get(cell);
-    if (found != null && found.getSequenceId() == cell.getSequenceId()) {
-      removeFromCellSet(cell);
-      long s = heapSizeChange(cell, true);
-      this.size.addAndGet(-s);
-    }
-  }
-
-  /**
-   * Write a delete
-   * @param deleteCell
-   * @return approximate size of the passed key and value.
-   */
-  @Override
-  public long delete(Cell deleteCell) {
-    long s = 0;
-    Cell toAdd = maybeCloneWithAllocator(deleteCell);
-    s += heapSizeChange(toAdd, addToCellSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
+    rollbackInSnapshot(cell);
+    rollbackInActive(cell);
   }
 
   /**
@@ -342,604 +137,29 @@ public class DefaultMemStore implements MemStore {
    * @return Next row or null if none found.
    */
   Cell getNextRow(final Cell cell) {
-    return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
-  }
-
-  /*
-   * @param a
-   * @param b
-   * @return Return lowest of a or b or null if both a and b are null
-   */
-  private Cell getLowest(final Cell a, final Cell b) {
-    if (a == null) {
-      return b;
-    }
-    if (b == null) {
-      return a;
-    }
-    return comparator.compareRows(a, b) <= 0? a: b;
+    return getLowest(
+        getNextRow(cell, getActive().getCellSet()),
+        getNextRow(cell, getSnapshot().getCellSet()));
   }
 
-  /*
-   * @param key Find row that follows this one.  If null, return first.
-   * @param map Set to look in for a row beyond <code>row</code>.
-   * @return Next row or null if none found.  If one found, will be a new
-   * KeyValue -- can be destroyed by subsequent calls to this method.
-   */
-  private Cell getNextRow(final Cell key,
-      final NavigableSet<Cell> set) {
-    Cell result = null;
-    SortedSet<Cell> tail = key == null? set: set.tailSet(key);
-    // Iterate until we fall into the next row; i.e. move off current row
-    for (Cell cell: tail) {
-      if (comparator.compareRows(cell, key) <= 0)
-        continue;
-      // Note: Not suppressing deletes or expired cells.  Needs to be handled
-      // by higher up functions.
-      result = cell;
-      break;
-    }
-    return result;
+  @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) {
   }
 
   /**
-   * Only used by tests. TODO: Remove
-   *
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   *
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param newValue
-   * @param now
-   * @return  Timestamp
+   * @return Total memory occupied by this MemStore.
    */
   @Override
-  public long updateColumnValue(byte[] row,
-                                byte[] family,
-                                byte[] qualifier,
-                                long newValue,
-                                long now) {
-    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
-    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
-    if (!snSs.isEmpty()) {
-      Cell snc = snSs.first();
-      // is there a matching Cell in the snapshot?
-      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
-        if (snc.getTimestamp() == now) {
-          // poop,
-          now += 1;
-        }
-      }
-    }
-
-    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-    // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
-    // so we cant add the new Cell w/o knowing what's there already, but we also
-    // want to take this chance to delete some cells. So two loops (sad)
-
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
-    for (Cell cell : ss) {
-      // if this isnt the row we are interested in, then bail:
-      if (!CellUtil.matchingColumn(cell, family, qualifier)
-          || !CellUtil.matchingRow(cell, firstCell)) {
-        break; // rows dont match, bail.
-      }
-
-      // if the qualifier matches and it's a put, just RM it out of the cellSet.
-      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
-          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
-        now = cell.getTimestamp();
-      }
-    }
-
-    // create or update (upsert) a new Cell with
-    // 'now' and a 0 memstoreTS == immediately visible
-    List<Cell> cells = new ArrayList<Cell>(1);
-    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
-  }
-
-  /**
-   * Update or insert the specified KeyValues.
-   * <p>
-   * For each KeyValue, insert into MemStore.  This will atomically upsert the
-   * value for that row/family/qualifier.  If a KeyValue did already exist,
-   * it will then be removed.
-   * <p>
-   * This is called under row lock, so Get operations will still see updates
-   * atomically.  Scans will only see each KeyValue update as atomic.
-   *
-   * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in memstore size
-   */
-  @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) {
-    long size = 0;
-    for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
-    }
-    return size;
-  }
-
-  /**
-   * Inserts the specified KeyValue into MemStore and deletes any existing
-   * versions of the same row/family/qualifier as the specified KeyValue.
-   * <p>
-   * First, the specified KeyValue is inserted into the Memstore.
-   * <p>
-   * If there are any existing KeyValues in this MemStore with the same row,
-   * family, and qualifier, they are removed.
-   * <p>
-   * Callers must hold the read lock.
-   * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells.
-   * @return change in size of MemStore
-   */
-  private long upsert(Cell cell, long readpoint) {
-    // Add the Cell to the MemStore
-    // Use the internalAdd method here since we (a) already have a lock
-    // and (b) cannot safely use the MSLAB here without potentially
-    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
-    // test that triggers the pathological case if we don't avoid MSLAB
-    // here.
-    long addedSize = internalAdd(cell);
-
-    // Get the Cells for the row/family/qualifier regardless of timestamp.
-    // For this case we want to clean up any other puts
-    Cell firstCell = KeyValueUtil.createFirstOnRow(
-        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
-    Iterator<Cell> it = ss.iterator();
-    // Versions visible to oldest scanner.
-    int versionsVisible = 0;
-    while ( it.hasNext() ) {
-      Cell cur = it.next();
-
-      if (cell == cur) {
-        // ignore the one just put in
-        continue;
-      }
-      // check that this is the row and column we are interested in, otherwise bail
-      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
-        // only remove Puts that concurrent scanners cannot possibly see
-        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
-            cur.getSequenceId() <= readpoint) {
-          if (versionsVisible >= 1) {
-            // if we get here we have seen at least one version visible to the oldest scanner,
-            // which means we can prove that no scanner will see this version
-
-            // false means there was a change, so give us the size.
-            long delta = heapSizeChange(cur, true);
-            addedSize -= delta;
-            this.size.addAndGet(-delta);
-            it.remove();
-            setOldestEditTimeToNow();
-          } else {
-            versionsVisible++;
-          }
-        }
-      } else {
-        // past the row or column, done
-        break;
-      }
-    }
-    return addedSize;
-  }
-
-  /**
-   * @return scanner on memstore and snapshot in this order.
-   */
-  @Override
-  public List<KeyValueScanner> getScanners(long readPt) {
-    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
-  }
-
-  /**
-   * Check if this memstore may contain the required keys
-   * @param scan scan
-   * @param store holds reference to cf
-   * @param oldestUnexpiredTS
-   * @return False if the key definitely does not exist in this Memstore
-   */
-  public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
-    byte[] cf = store.getFamily().getName();
-    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
-    if (timeRange == null) {
-      timeRange = scan.getTimeRange();
-    }
-    return (timeRangeTracker.includesTimeRange(timeRange) ||
-        snapshotTimeRangeTracker.includesTimeRange(timeRange))
-        && (Math.max(timeRangeTracker.getMaximumTimestamp(),
-                     snapshotTimeRangeTracker.getMaximumTimestamp()) >=
-            oldestUnexpiredTS);
-  }
-
-  /*
-   * MemStoreScanner implements the KeyValueScanner.
-   * It lets the caller scan the contents of a memstore -- both current
-   * map and snapshot.
-   * This behaves as if it were a real scanner but does not maintain position.
-   */
-  protected class MemStoreScanner extends NonLazyKeyValueScanner {
-    // Next row information for either cellSet or snapshot
-    private Cell cellSetNextRow = null;
-    private Cell snapshotNextRow = null;
-
-    // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
-    private Cell cellSetItRow = null;
-    private Cell snapshotItRow = null;
-
-    // iterator based scanning.
-    private Iterator<Cell> cellSetIt;
-    private Iterator<Cell> snapshotIt;
-
-    // The cellSet and snapshot at the time of creating this scanner
-    private CellSkipListSet cellSetAtCreation;
-    private CellSkipListSet snapshotAtCreation;
-
-    // the pre-calculated Cell to be returned by peek() or next()
-    private Cell theNext;
-
-    // The allocator and snapshot allocator at the time of creating this scanner
-    volatile MemStoreLAB allocatorAtCreation;
-    volatile MemStoreLAB snapshotAllocatorAtCreation;
-
-    // A flag represents whether could stop skipping Cells for MVCC
-    // if have encountered the next row. Only used for reversed scan
-    private boolean stopSkippingCellsIfNextRow = false;
-
-    private long readPoint;
-
-    /*
-    Some notes...
-
-     So memstorescanner is fixed at creation time. this includes pointers/iterators into
-    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
-    snapshot is moved.  since kvset is null there is no point on reseeking on both,
-      we can save us the trouble. During the snapshot->hfile transition, the memstore
-      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
-      potentially do something smarter by adjusting the existing memstore scanner.
-
-      But there is a greater problem here, that being once a scanner has progressed
-      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
-      if a scan lasts a little while, there is a chance for new entries in kvset to
-      become available but we will never see them.  This needs to be handled at the
-      StoreScanner level with coordination with MemStoreScanner.
-
-      Currently, this problem is only partly managed: during the small amount of time
-      when the StoreScanner has not yet created a new MemStoreScanner, we will miss
-      the adds to kvset in the MemStoreScanner.
-    */
-
-    MemStoreScanner(long readPoint) {
-      super();
-
-      this.readPoint = readPoint;
-      cellSetAtCreation = cellSet;
-      snapshotAtCreation = snapshot;
-      if (allocator != null) {
-        this.allocatorAtCreation = allocator;
-        this.allocatorAtCreation.incScannerCount();
-      }
-      if (snapshotAllocator != null) {
-        this.snapshotAllocatorAtCreation = snapshotAllocator;
-        this.snapshotAllocatorAtCreation.incScannerCount();
-      }
-      if (Trace.isTracing() && Trace.currentSpan() != null) {
-        Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
-      }
-    }
-
-    /**
-     * Lock on 'this' must be held by caller.
-     * @param it
-     * @return Next Cell
-     */
-    private Cell getNext(Iterator<Cell> it) {
-      Cell startCell = theNext;
-      Cell v = null;
-      try {
-        while (it.hasNext()) {
-          v = it.next();
-          if (v.getSequenceId() <= this.readPoint) {
-            return v;
-          }
-          if (stopSkippingCellsIfNextRow && startCell != null
-              && comparator.compareRows(v, startCell) > 0) {
-            return null;
-          }
-        }
-
-        return null;
-      } finally {
-        if (v != null) {
-          // in all cases, remember the last Cell iterated to
-          if (it == snapshotIt) {
-            snapshotItRow = v;
-          } else {
-            cellSetItRow = v;
-          }
-        }
-      }
-    }
-
-    /**
-     *  Set the scanner at the seek key.
-     *  Must be called only once: there is no thread safety between the scanner
-     *   and the memStore.
-     * @param key seek value
-     * @return false if the key is null or if there is no data
-     */
-    @Override
-    public synchronized boolean seek(Cell key) {
-      if (key == null) {
-        close();
-        return false;
-      }
-      // kvset and snapshot will never be null.
-      // if tailSet can't find anything, SortedSet is empty (not null).
-      cellSetIt = cellSetAtCreation.tailSet(key).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
-      cellSetItRow = null;
-      snapshotItRow = null;
-
-      return seekInSubLists(key);
-    }
-
-
-    /**
-     * (Re)initialize the iterators after a seek or a reseek.
-     */
-    private synchronized boolean seekInSubLists(Cell key){
-      cellSetNextRow = getNext(cellSetIt);
-      snapshotNextRow = getNext(snapshotIt);
-
-      // Calculate the next value
-      theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
-      // has data
-      return (theNext != null);
-    }
-
-
-    /**
-     * Move forward on the sub-lists set previously by seek.
-     * @param key seek value (should be non-null)
-     * @return true if there is at least one KV to read, false otherwise
-     */
-    @Override
-    public synchronized boolean reseek(Cell key) {
-      /*
-      See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
-      This code is executed concurrently with flush and puts, without locks.
-      Two points must be known when working on this code:
-      1) It's not possible to use the 'kvTail' and 'snapshot'
-       variables, as they are modified during a flush.
-      2) The ideal implementation for performance would use the sub skip list
-       implicitly pointed by the iterators 'kvsetIt' and
-       'snapshotIt'. Unfortunately the Java API does not offer a method to
-       get it. So we remember the last keys we iterated to and restore
-       the reseeked set to at least that point.
-       */
-      cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
-      return seekInSubLists(key);
-    }
-
-
-    @Override
-    public synchronized Cell peek() {
-      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
-      return theNext;
-    }
-
-    @Override
-    public synchronized Cell next() {
-      if (theNext == null) {
-          return null;
-      }
-
-      final Cell ret = theNext;
-
-      // Advance one of the iterators
-      if (theNext == cellSetNextRow) {
-        cellSetNextRow = getNext(cellSetIt);
-      } else {
-        snapshotNextRow = getNext(snapshotIt);
-      }
-
-      // Calculate the next value
-      theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
-      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
-      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
-      //    getLowest() + " threadpoint=" + readpoint);
-      return ret;
-    }
-
-    /*
-     * Returns the lower of the two key values, or null if they are both null.
-     * This uses comparator.compare() to compare the KeyValue using the memstore
-     * comparator.
-     */
-    private Cell getLowest(Cell first, Cell second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare <= 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    /*
-     * Returns the higher of the two cells, or null if they are both null.
-     * This uses comparator.compare() to compare the Cell using the memstore
-     * comparator.
-     */
-    private Cell getHighest(Cell first, Cell second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare > 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    public synchronized void close() {
-      this.cellSetNextRow = null;
-      this.snapshotNextRow = null;
-
-      this.cellSetIt = null;
-      this.snapshotIt = null;
-
-      if (allocatorAtCreation != null) {
-        this.allocatorAtCreation.decScannerCount();
-        this.allocatorAtCreation = null;
-      }
-      if (snapshotAllocatorAtCreation != null) {
-        this.snapshotAllocatorAtCreation.decScannerCount();
-        this.snapshotAllocatorAtCreation = null;
-      }
-
-      this.cellSetItRow = null;
-      this.snapshotItRow = null;
-    }
-
-    /**
-     * MemStoreScanner returns max value as sequence id because it will
-     * always have the latest data among all files.
-     */
-    @Override
-    public long getSequenceID() {
-      return Long.MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
-      return shouldSeek(scan, store, oldestUnexpiredTS);
-    }
-
-    /**
-     * Seek scanner to the given key first. If it returns false(means
-     * peek()==null) or scanner's peek row is bigger than row of given key, seek
-     * the scanner to the previous row of given key
-     */
-    @Override
-    public synchronized boolean backwardSeek(Cell key) {
-      seek(key);
-      if (peek() == null || comparator.compareRows(peek(), key) > 0) {
-        return seekToPreviousRow(key);
-      }
-      return true;
-    }
-
-    /**
-     * Separately get the KeyValue before the specified key from kvset and
-     * snapshotset, and use the row of higher one as the previous row of
-     * specified key, then seek to the first KeyValue of previous row
-     */
-    @Override
-    public synchronized boolean seekToPreviousRow(Cell originalKey) {
-      boolean keepSeeking = false;
-      Cell key = originalKey;
-      do {
-        Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
-        SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
-        Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
-        SortedSet<Cell> snapshotHead = snapshotAtCreation
-            .headSet(firstKeyOnRow);
-        Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
-            .last();
-        Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
-        if (lastCellBeforeRow == null) {
-          theNext = null;
-          return false;
-        }
-        Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
-        this.stopSkippingCellsIfNextRow = true;
-        seek(firstKeyOnPreviousRow);
-        this.stopSkippingCellsIfNextRow = false;
-        if (peek() == null
-            || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
-          keepSeeking = true;
-          key = firstKeyOnPreviousRow;
-          continue;
-        } else {
-          keepSeeking = false;
-        }
-      } while (keepSeeking);
-      return true;
-    }
-
-    @Override
-    public synchronized boolean seekToLastRow() {
-      Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
-          .last();
-      Cell second = snapshotAtCreation.isEmpty() ? null
-          : snapshotAtCreation.last();
-      Cell higherCell = getHighest(first, second);
-      if (higherCell == null) {
-        return false;
-      }
-      Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
-      if (seek(firstCellOnLastRow)) {
-        return true;
-      } else {
-        return seekToPreviousRow(higherCell);
-      }
-
-    }
-  }
-
-  public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-      + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
-
-  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
-      (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
-  /*
-   * Calculate how the MemStore size has changed.  Includes overhead of the
-   * backing Map.
-   * @param cell
-   * @param notpresent True if the cell was NOT present in the set.
-   * @return Size
-   */
-  static long heapSizeChange(final Cell cell, final boolean notpresent) {
-    return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
-        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
-  }
-
-  private long keySize() {
-    return heapSize() - DEEP_OVERHEAD;
+  public long size() {
+    return heapSize();
   }
 
   /**
-   * Get the entire heap usage for this MemStore not including keys in the
-   * snapshot.
+   * Check whether anything need to be done based on the current active set size
+   * Nothing need to be done for the DefaultMemStore
    */
   @Override
-  public long heapSize() {
-    return size.get();
-  }
-
-  @Override
-  public long size() {
-    return heapSize();
+  protected void checkActiveSize() {
+    return;
   }
 
   /**
@@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore {
     LOG.info("memstore2 estimated size=" + size);
     final int seconds = 30;
     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
-    for (int i = 0; i < seconds; i++) {
-      // Thread.sleep(1000);
-    }
     LOG.info("Exiting.");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c65326a..5c29fb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -18,6 +18,13 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
@@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
  * or more StoreFiles, which stretch backwards over time.
@@ -1636,7 +1636,7 @@ public class HStore implements Store {
       this.lock.readLock().unlock();
     }
 
-    LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
+    LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
         + (request.isAllFiles() ? " (all files)" : ""));
     this.region.reportCompactionRequestStart(request.isMajor());
@@ -1990,8 +1990,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Used in tests. TODO: Remove
-   *
    * Updates the value for the given row/family/qualifier. This function will always be seen as
    * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
    * control necessary.
@@ -2002,6 +2000,7 @@ public class HStore implements Store {
    * @return memstore size delta
    * @throws IOException
    */
+  @VisibleForTesting
   public long updateColumnValue(byte [] row, byte [] f,
                                 byte [] qualifier, long newValue)
       throws IOException {
@@ -2055,7 +2054,8 @@ public class HStore implements Store {
      */
     @Override
     public void prepare() {
-      this.snapshot = memstore.snapshot();
+      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
+      this.snapshot = memstore.snapshot(cacheFlushSeqNum);
       this.cacheFlushCount = snapshot.getCellsCount();
       this.cacheFlushSize = snapshot.getSize();
       committedFiles = new ArrayList<Path>(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
new file mode 100644
index 0000000..cfcd81e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
+ * and is not needed for a {@link MutableSegment}. Specifically, the method
+ * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
+ * {@link MemStoreSnapshot} object.
+ * In addition, this class overrides methods that are not likely to be supported by an immutable
+ * segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which
+ * can be very inefficient.
+ */
+@InterfaceAudience.Private
+public abstract class ImmutableSegment extends Segment {
+
+  public ImmutableSegment(Segment segment) {
+    super(segment);
+  }
+
+  /**
+   * Removes the given cell from this segment.
+   * By default immutable store segment can not rollback
+   * It may be invoked by tests in specific cases where it is known to be supported {@link
+   * ImmutableSegmentAdapter}
+   */
+  @Override
+  public long rollback(Cell cell) {
+    return 0;
+  }
+
+  /**
+   * Returns a set of all the cells in the segment.
+   * The implementation of this method might be very inefficient for some immutable segments
+   * that do not maintain a cell set. Therefore by default this method is not supported.
+   * It may be invoked by tests in specific cases where it is known to be supported {@link
+   * ImmutableSegmentAdapter}
+   */
+  @Override
+  public CellSet getCellSet() {
+    throw new NotImplementedException("Immutable Segment does not support this operation by " +
+        "default");
+  }
+
+  /**
+   * Builds a special scanner for the MemStoreSnapshot object that may be different than the
+   * general segment scanner.
+   * @return a special scanner for the MemStoreSnapshot object
+   */
+  public abstract KeyValueScanner getKeyValueScanner();
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
new file mode 100644
index 0000000..058865a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+
+/**
+ * This segment is adapting a mutable segment making it into an immutable segment.
+ * This is used when a mutable segment is moved to being a snapshot or pushed into a compaction
+ * pipeline, that consists only of immutable segments.
+ * The compaction may generate different type of immutable segment
+ */
+@InterfaceAudience.Private
+public class ImmutableSegmentAdapter extends ImmutableSegment {
+
+  final private MutableSegment adaptee;
+
+  public ImmutableSegmentAdapter(MutableSegment segment) {
+    super(segment);
+    this.adaptee = segment;
+  }
+
+  @Override
+  public KeyValueScanner getKeyValueScanner() {
+    return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator());
+  }
+
+  @Override
+  public SegmentScanner getSegmentScanner(long readPoint) {
+    return adaptee.getSegmentScanner(readPoint);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return adaptee.isEmpty();
+  }
+
+  @Override
+  public int getCellsCount() {
+    return adaptee.getCellsCount();
+  }
+
+  @Override
+  public long add(Cell cell) {
+    return adaptee.add(cell);
+  }
+
+  @Override
+  public Cell getFirstAfter(Cell cell) {
+    return adaptee.getFirstAfter(cell);
+  }
+
+  @Override
+  public void close() {
+    adaptee.close();
+  }
+
+  @Override
+  public Cell maybeCloneWithAllocator(Cell cell) {
+    return adaptee.maybeCloneWithAllocator(cell);
+  }
+
+  @Override
+  public Segment setSize(long size) {
+    adaptee.setSize(size);
+    return this;
+  }
+
+  @Override
+  public long getSize() {
+    return adaptee.getSize();
+  }
+
+  @Override
+  public long rollback(Cell cell) {
+    return adaptee.rollback(cell);
+  }
+
+  @Override
+  public CellSet getCellSet() {
+    return adaptee.getCellSet();
+  }
+
+  @Override
+  public void dump(Log log) {
+    adaptee.dump(log);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index e9f8103..a10ccd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -17,10 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.HeapSize;
 
 /**
@@ -41,10 +42,19 @@ public interface MemStore extends HeapSize {
   MemStoreSnapshot snapshot();
 
   /**
+   * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+   * {@link #clearSnapshot(long)}.
+   * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed
+   *                     segment
+   * @return {@link MemStoreSnapshot}
+   */
+  MemStoreSnapshot snapshot(long flushOpSeqId);
+
+  /**
    * Clears the current snapshot of the Memstore.
    * @param id
    * @throws UnexpectedStateException
-   * @see #snapshot()
+   * @see #snapshot(long)
    */
   void clearSnapshot(long id) throws UnexpectedStateException;
 
@@ -128,7 +138,7 @@ public interface MemStore extends HeapSize {
    * @return scanner over the memstore. This might include scanner over the snapshot when one is
    * present.
    */
-  List<KeyValueScanner> getScanners(long readPt);
+  List<KeyValueScanner> getScanners(long readPt) throws IOException;
 
   /**
    * @return Total memory occupied by this MemStore.