You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/05/12 11:43:31 UTC

hbase git commit: HBASE-17887 Row-level consistency is broken for read

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 5cbc041b9 -> 72edf521c


HBASE-17887 Row-level consistency is broken for read


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72edf521
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72edf521
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72edf521

Branch: refs/heads/branch-1.3
Commit: 72edf521c1effe3afe6ce6b39aaf843b8651a4a6
Parents: 5cbc041
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Fri May 12 19:42:45 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri May 12 19:42:45 2017 +0800

----------------------------------------------------------------------
 .../regionserver/ChangedReadersObserver.java    |   9 +-
 .../hbase/regionserver/DefaultMemStore.java     | 300 ++++++++++---------
 .../hadoop/hbase/regionserver/HStore.java       |   9 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |  44 ++-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |   3 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  66 ++--
 .../hadoop/hbase/regionserver/TestHRegion.java  |   4 +-
 .../regionserver/TestMemStoreChunkPool.java     |  12 +-
 .../hadoop/hbase/regionserver/TestStore.java    | 151 +++++++++-
 .../hbase/regionserver/TestStoreScanner.java    |   5 +-
 .../hbase/regionserver/TestWideScanner.java     |   3 +-
 11 files changed, 398 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 0bc75e7..4421ac5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -30,9 +30,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public interface ChangedReadersObserver {
+
+  /**
+   * @return the read point of the current scan
+   */
+  long getReadPoint();
   /**
    * Notify observers.
+   * @param sfs The new files
+   * @param memStoreScanners scanner of current memstore
    * @throws IOException e
    */
-  void updateReaders(List<StoreFile> sfs) throws IOException;
+  void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 70f5a12..884ef29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -70,38 +70,26 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 public class DefaultMemStore implements MemStore {
   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
+  @VisibleForTesting
   static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
   private static final boolean USEMSLAB_DEFAULT = true;
-  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
+  private static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
 
   private Configuration conf;
 
-  // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
-  // better semantics.  The Map will overwrite if passed a key it already had
-  // whereas the Set will not add new Cell if key is same though value might be
-  // different.  Value is not important -- just make sure always same
-  // reference passed.
-  volatile CellSkipListSet cellSet;
-
-  // Snapshot of memstore.  Made for flusher.
-  volatile CellSkipListSet snapshot;
-
+  @VisibleForTesting
   final KeyValue.KVComparator comparator;
 
-  // Used to track own heapSize
-  final AtomicLong size;
-  private volatile long snapshotSize;
-
   // Used to track when to flush
-  volatile long timeOfOldestEdit = Long.MAX_VALUE;
+  private volatile long timeOfOldestEdit = Long.MAX_VALUE;
 
-  TimeRangeTracker timeRangeTracker;
-  TimeRangeTracker snapshotTimeRangeTracker;
+  private volatile long snapshotId;
+  private volatile boolean tagsPresent;
 
-  volatile MemStoreLAB allocator;
-  volatile MemStoreLAB snapshotAllocator;
-  volatile long snapshotId;
-  volatile boolean tagsPresent;
+  @VisibleForTesting
+  volatile Section activeSection;
+  @VisibleForTesting
+  volatile Section snapshotSection;
 
   /**
    * Default constructor. Used for tests.
@@ -118,28 +106,8 @@ public class DefaultMemStore implements MemStore {
                   final KeyValue.KVComparator c) {
     this.conf = conf;
     this.comparator = c;
-    this.cellSet = new CellSkipListSet(c);
-    this.snapshot = new CellSkipListSet(c);
-    timeRangeTracker = new TimeRangeTracker();
-    snapshotTimeRangeTracker = new TimeRangeTracker();
-    this.size = new AtomicLong(DEEP_OVERHEAD);
-    this.snapshotSize = 0;
-    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-      this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-          new Class[] { Configuration.class }, new Object[] { conf });
-    } else {
-      this.allocator = null;
-    }
-  }
-
-  void dump() {
-    for (Cell cell: this.cellSet) {
-      LOG.info(cell);
-    }
-    for (Cell cell: this.snapshot) {
-      LOG.info(cell);
-    }
+    this.activeSection = Section.newActiveSection(comparator, conf);
+    this.snapshotSection = Section.newSnapshotSection(comparator);
   }
 
   /**
@@ -150,31 +118,22 @@ public class DefaultMemStore implements MemStore {
   public MemStoreSnapshot snapshot() {
     // If snapshot currently has entries, then flusher failed or didn't call
     // cleanup.  Log a warning.
-    if (!this.snapshot.isEmpty()) {
+    if (!snapshotSection.getCellSkipListSet().isEmpty()) {
       LOG.warn("Snapshot called again without clearing previous. " +
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
-      this.snapshotSize = keySize();
-      if (!this.cellSet.isEmpty()) {
-        this.snapshot = this.cellSet;
-        this.cellSet = new CellSkipListSet(this.comparator);
-        this.snapshotTimeRangeTracker = this.timeRangeTracker;
-        this.timeRangeTracker = new TimeRangeTracker();
-        // Reset heap to not include any keys
-        this.size.set(DEEP_OVERHEAD);
-        this.snapshotAllocator = this.allocator;
-        // Reset allocator so we get a fresh buffer for the new memstore
-        if (allocator != null) {
-          String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
-          this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
-              new Class[] { Configuration.class }, new Object[] { conf });
-        }
+      if (!activeSection.getCellSkipListSet().isEmpty()) {
+        snapshotSection = activeSection;
+        activeSection = Section.newActiveSection(comparator, conf);
+        snapshotSection.getHeapSize().addAndGet(-DEEP_OVERHEAD);
         timeOfOldestEdit = Long.MAX_VALUE;
       }
     }
-    MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
-        this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
+    MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId,
+        snapshotSection.getCellSkipListSet().size(), snapshotSection.getHeapSize().get(),
+        snapshotSection.getTimeRangeTracker(),
+        new CollectionBackedScanner(snapshotSection.getCellSkipListSet(), this.comparator),
         this.tagsPresent);
     this.tagsPresent = false;
     return memStoreSnapshot;
@@ -188,37 +147,29 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public void clearSnapshot(long id) throws UnexpectedStateException {
-    MemStoreLAB tmpAllocator = null;
     if (this.snapshotId == -1) return;  // already cleared
     if (this.snapshotId != id) {
       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
           + id);
     }
-    // OK. Passed in snapshot is same as current snapshot. If not-empty,
-    // create a new snapshot and let the old one go.
-    if (!this.snapshot.isEmpty()) {
-      this.snapshot = new CellSkipListSet(this.comparator);
-      this.snapshotTimeRangeTracker = new TimeRangeTracker();
-    }
-    this.snapshotSize = 0;
-    this.snapshotId = -1;
-    if (this.snapshotAllocator != null) {
-      tmpAllocator = this.snapshotAllocator;
-      this.snapshotAllocator = null;
-    }
+    // OK. Passed in snapshot is same as current snapshot.
+    MemStoreLAB tmpAllocator = snapshotSection.getMemStoreLAB();
+    snapshotSection = Section.newSnapshotSection(comparator);
     if (tmpAllocator != null) {
       tmpAllocator.close();
     }
+    this.snapshotId = -1;
   }
 
   @Override
   public long getFlushableSize() {
-    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+    long snapshotSize = snapshotSection.getHeapSize().get();
+    return snapshotSize > 0 ? snapshotSize : keySize();
   }
 
   @Override
   public long getSnapshotSize() {
-    return this.snapshotSize;
+    return snapshotSection.getHeapSize().get();
   }
 
   /**
@@ -239,7 +190,7 @@ public class DefaultMemStore implements MemStore {
   }
 
   private boolean addToCellSet(Cell e) {
-    boolean b = this.cellSet.add(e);
+    boolean b = this.activeSection.getCellSkipListSet().add(e);
     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
     // When we use ACL CP or Visibility CP which deals with Tags during
     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
@@ -252,7 +203,7 @@ public class DefaultMemStore implements MemStore {
   }
 
   private boolean removeFromCellSet(Cell e) {
-    boolean b = this.cellSet.remove(e);
+    boolean b = this.activeSection.getCellSkipListSet().remove(e);
     setOldestEditTimeToNow();
     return b;
   }
@@ -281,8 +232,8 @@ public class DefaultMemStore implements MemStore {
     if (!notPresent && mslabUsed) {
       s += getCellLength(toAdd);
     }
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
+    activeSection.getTimeRangeTracker().includeTimestamp(toAdd);
+    activeSection.getHeapSize().addAndGet(s);
     return s;
   }
 
@@ -295,12 +246,12 @@ public class DefaultMemStore implements MemStore {
   }
 
   private Cell maybeCloneWithAllocator(Cell cell) {
-    if (allocator == null) {
+    if (activeSection.getMemStoreLAB() == null) {
       return cell;
     }
 
     int len = getCellLength(cell);
-    ByteRange alloc = allocator.allocateBytes(len);
+    ByteRange alloc = activeSection.getMemStoreLAB().allocateBytes(len);
     if (alloc == null) {
       // The allocation was too large, allocator decided
       // not to do anything with it.
@@ -328,18 +279,18 @@ public class DefaultMemStore implements MemStore {
     // not the snapshot. The flush of this snapshot to disk has not
     // yet started because Store.flush() waits for all rwcc transactions to
     // commit before starting the flush to disk.
-    Cell found = this.snapshot.get(cell);
+    Cell found = snapshotSection.getCellSkipListSet().get(cell);
     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
-      this.snapshot.remove(cell);
+      snapshotSection.getCellSkipListSet().remove(cell);
       long sz = heapSizeChange(cell, true);
-      this.snapshotSize -= sz;
+      snapshotSection.getHeapSize().addAndGet(-sz);
     }
     // If the key is in the memstore, delete it. Update this.size.
-    found = this.cellSet.get(cell);
+    found = activeSection.getCellSkipListSet().get(cell);
     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
       removeFromCellSet(cell);
-      long s = heapSizeChange(cell, true);
-      this.size.addAndGet(-s);
+      long sz = heapSizeChange(found, true);
+      activeSection.getHeapSize().addAndGet(-sz);
     }
   }
 
@@ -361,7 +312,8 @@ public class DefaultMemStore implements MemStore {
    * @return Next row or null if none found.
    */
   Cell getNextRow(final Cell cell) {
-    return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
+    return getLowest(getNextRow(cell, activeSection.getCellSkipListSet()),
+          getNextRow(cell, snapshotSection.getCellSkipListSet()));
   }
 
   /*
@@ -406,8 +358,8 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
-    getRowKeyAtOrBefore(cellSet, state);
-    getRowKeyAtOrBefore(snapshot, state);
+    getRowKeyAtOrBefore(activeSection.getCellSkipListSet(), state);
+    getRowKeyAtOrBefore(snapshotSection.getCellSkipListSet(), state);
   }
 
   /*
@@ -505,7 +457,7 @@ public class DefaultMemStore implements MemStore {
                                 long now) {
     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
+    SortedSet<Cell> snSs = snapshotSection.getCellSkipListSet().tailSet(firstCell);
     if (!snSs.isEmpty()) {
       Cell snc = snSs.first();
       // is there a matching Cell in the snapshot?
@@ -523,7 +475,7 @@ public class DefaultMemStore implements MemStore {
     // so we cant add the new Cell w/o knowing what's there already, but we also
     // want to take this chance to delete some cells. So two loops (sad)
 
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
+    SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
     for (Cell cell : ss) {
       // if this isnt the row we are interested in, then bail:
       if (!CellUtil.matchingColumn(cell, family, qualifier)
@@ -601,7 +553,7 @@ public class DefaultMemStore implements MemStore {
         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-    SortedSet<Cell> ss = cellSet.tailSet(firstCell);
+    SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
     Iterator<Cell> it = ss.iterator();
     // versions visible to oldest scanner
     int versionsVisible = 0;
@@ -624,7 +576,7 @@ public class DefaultMemStore implements MemStore {
             // false means there was a change, so give us the size.
             long delta = heapSizeChange(cur, true);
             addedSize -= delta;
-            this.size.addAndGet(-delta);
+            activeSection.getHeapSize().addAndGet(-delta);
             it.remove();
             setOldestEditTimeToNow();
           } else {
@@ -680,7 +632,8 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public List<KeyValueScanner> getScanners(long readPt) {
-    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
+    return Collections.<KeyValueScanner> singletonList(
+        new MemStoreScanner(activeSection, snapshotSection, readPt, comparator));
   }
 
   /**
@@ -691,14 +644,29 @@ public class DefaultMemStore implements MemStore {
    * @return False if the key definitely does not exist in this Memstore
    */
   public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
+    return shouldSeek(activeSection.getTimeRangeTracker(),
+        snapshotSection.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
+  }
+
+  /**
+   * Check if this memstore may contain the required keys
+   * @param activeTimeRangeTracker the tracker of active data
+   * @param snapshotTimeRangeTracker the tracker of snapshot data
+   * @param scan scan
+   * @param store holds reference to cf
+   * @param oldestUnexpiredTS
+   * @return False if the key definitely does not exist in this Memstore
+   */
+  private static boolean shouldSeek(TimeRangeTracker activeTimeRangeTracker,
+      TimeRangeTracker snapshotTimeRangeTracker, Scan scan, Store store, long oldestUnexpiredTS) {
     byte[] cf = store.getFamily().getName();
     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
     if (timeRange == null) {
       timeRange = scan.getTimeRange();
     }
-    return (timeRangeTracker.includesTimeRange(timeRange) ||
+    return (activeTimeRangeTracker.includesTimeRange(timeRange) ||
       snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
-      (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
+      (Math.max(activeTimeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
   }
 
   /*
@@ -707,7 +675,7 @@ public class DefaultMemStore implements MemStore {
    * map and snapshot.
    * This behaves as if it were a real scanner but does not maintain position.
    */
-  protected class MemStoreScanner extends NonLazyKeyValueScanner {
+  protected static class MemStoreScanner extends NonLazyKeyValueScanner {
     // Next row information for either cellSet or snapshot
     private Cell cellSetNextRow = null;
     private Cell snapshotNextRow = null;
@@ -721,22 +689,18 @@ public class DefaultMemStore implements MemStore {
     private Iterator<Cell> snapshotIt;
 
     // The cellSet and snapshot at the time of creating this scanner
-    private CellSkipListSet cellSetAtCreation;
-    private CellSkipListSet snapshotAtCreation;
+    private final Section activeAtCreation;
+    private final Section snapshotAtCreation;
 
     // the pre-calculated Cell to be returned by peek() or next()
     private Cell theNext;
 
-    // The allocator and snapshot allocator at the time of creating this scanner
-    volatile MemStoreLAB allocatorAtCreation;
-    volatile MemStoreLAB snapshotAllocatorAtCreation;
-    
     // A flag represents whether could stop skipping Cells for MVCC
     // if have encountered the next row. Only used for reversed scan
     private boolean stopSkippingCellsIfNextRow = false;
 
-    private long readPoint;
-
+    private final long readPoint;
+    private final KeyValue.KVComparator comparator;
     /*
     Some notes...
 
@@ -758,19 +722,16 @@ public class DefaultMemStore implements MemStore {
       the adds to kvset in the MemStoreScanner.
     */
 
-    MemStoreScanner(long readPoint) {
-      super();
-
+    MemStoreScanner(Section activeSection, Section snapshotSection, long readPoint, final KeyValue.KVComparator c) {
       this.readPoint = readPoint;
-      cellSetAtCreation = cellSet;
-      snapshotAtCreation = snapshot;
-      if (allocator != null) {
-        this.allocatorAtCreation = allocator;
-        this.allocatorAtCreation.incScannerCount();
+      this.comparator = c;
+      activeAtCreation = activeSection;
+      snapshotAtCreation = snapshotSection;
+      if (activeAtCreation.getMemStoreLAB() != null) {
+        activeAtCreation.getMemStoreLAB().incScannerCount();
       }
-      if (snapshotAllocator != null) {
-        this.snapshotAllocatorAtCreation = snapshotAllocator;
-        this.snapshotAllocatorAtCreation.incScannerCount();
+      if (snapshotAtCreation.getMemStoreLAB() != null) {
+        snapshotAtCreation.getMemStoreLAB().incScannerCount();
       }
       if (Trace.isTracing() && Trace.currentSpan() != null) {
         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
@@ -825,8 +786,8 @@ public class DefaultMemStore implements MemStore {
       }
       // kvset and snapshot will never be null.
       // if tailSet can't find anything, SortedSet is empty (not null).
-      cellSetIt = cellSetAtCreation.tailSet(key).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+      cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(key).iterator();
+      snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(key).iterator();
       cellSetItRow = null;
       snapshotItRow = null;
 
@@ -868,8 +829,8 @@ public class DefaultMemStore implements MemStore {
        get it. So we remember the last keys we iterated to and restore
        the reseeked set to at least that point.
        */
-      cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
+      cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(getHighest(key, cellSetItRow)).iterator();
+      snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(getHighest(key, snapshotItRow)).iterator();
 
       return seekInSubLists(key);
     }
@@ -937,20 +898,19 @@ public class DefaultMemStore implements MemStore {
       return (first != null ? first : second);
     }
 
+    @Override
     public synchronized void close() {
       this.cellSetNextRow = null;
       this.snapshotNextRow = null;
 
       this.cellSetIt = null;
       this.snapshotIt = null;
-      
-      if (allocatorAtCreation != null) {
-        this.allocatorAtCreation.decScannerCount();
-        this.allocatorAtCreation = null;
+
+      if (activeAtCreation != null && activeAtCreation.getMemStoreLAB() != null) {
+        activeAtCreation.getMemStoreLAB().decScannerCount();
       }
-      if (snapshotAllocatorAtCreation != null) {
-        this.snapshotAllocatorAtCreation.decScannerCount();
-        this.snapshotAllocatorAtCreation = null;
+      if (snapshotAtCreation != null && snapshotAtCreation.getMemStoreLAB() != null) {
+        snapshotAtCreation.getMemStoreLAB().decScannerCount();
       }
 
       this.cellSetItRow = null;
@@ -968,7 +928,8 @@ public class DefaultMemStore implements MemStore {
 
     @Override
     public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
-      return shouldSeek(scan, store, oldestUnexpiredTS);
+      return shouldSeek(activeAtCreation.getTimeRangeTracker(),
+        snapshotAtCreation.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
     }
 
     /**
@@ -997,9 +958,9 @@ public class DefaultMemStore implements MemStore {
       do {
         Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
             key.getRowLength());
-        SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
+        SortedSet<Cell> cellHead = activeAtCreation.getCellSkipListSet().headSet(firstKeyOnRow);
         Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
-        SortedSet<Cell> snapshotHead = snapshotAtCreation
+        SortedSet<Cell> snapshotHead = snapshotAtCreation.getCellSkipListSet()
             .headSet(firstKeyOnRow);
         Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
             .last();
@@ -1027,10 +988,10 @@ public class DefaultMemStore implements MemStore {
 
     @Override
     public synchronized boolean seekToLastRow() {
-      Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
-          .last();
-      Cell second = snapshotAtCreation.isEmpty() ? null
-          : snapshotAtCreation.last();
+      Cell first = activeAtCreation.getCellSkipListSet().isEmpty() ? null
+        : activeAtCreation.getCellSkipListSet().last();
+      Cell second = snapshotAtCreation.getCellSkipListSet().isEmpty() ? null
+          : snapshotAtCreation.getCellSkipListSet().last();
       Cell higherCell = getHighest(first, second);
       if (higherCell == null) {
         return false;
@@ -1047,10 +1008,10 @@ public class DefaultMemStore implements MemStore {
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-      + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
+      + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
+      (2 * ClassSize.ATOMIC_LONG) + (2 * ClassSize.TIMERANGE_TRACKER) +
       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
   /*
@@ -1075,7 +1036,7 @@ public class DefaultMemStore implements MemStore {
    */
   @Override
   public long heapSize() {
-    return size.get();
+    return activeSection.getHeapSize().get();
   }
 
   @Override
@@ -1125,4 +1086,63 @@ public class DefaultMemStore implements MemStore {
     LOG.info("Exiting.");
   }
 
+  /**
+   * Contains the fields which are useful to MemStoreScanner.
+   */
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  static class Section {
+    /**
+     * MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
+     * better semantics.  The Map will overwrite if passed a key it already had
+     * whereas the Set will not add new Cell if key is same though value might be
+     * different.  Value is not important -- just make sure always same reference passed.
+     */
+    private final CellSkipListSet cellSet;
+    private final TimeRangeTracker tracker = new TimeRangeTracker();
+    /**
+     * Used to track own heapSize.
+     */
+    private final AtomicLong heapSize;
+    private final MemStoreLAB allocator;
+
+    static Section newSnapshotSection(final KeyValue.KVComparator c) {
+      return new Section(c, null, 0);
+    }
+
+    static Section newActiveSection(final KeyValue.KVComparator c,
+            final Configuration conf) {
+      return new Section(c, conf, DEEP_OVERHEAD);
+    }
+
+    private Section(final KeyValue.KVComparator c,
+            final Configuration conf, long initHeapSize) {
+      this.cellSet = new CellSkipListSet(c);
+      this.heapSize = new AtomicLong(initHeapSize);
+      if (conf != null && conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+        String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
+        this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
+                new Class[]{Configuration.class}, new Object[]{conf});
+      } else {
+        this.allocator = null;
+      }
+    }
+
+    CellSkipListSet getCellSkipListSet() {
+      return cellSet;
+    }
+
+    TimeRangeTracker getTimeRangeTracker() {
+      return tracker;
+    }
+
+    AtomicLong getHeapSize() {
+      return heapSize;
+    }
+
+    MemStoreLAB getMemStoreLAB() {
+      return allocator;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index e7e33d6..24ec480 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1129,7 +1129,14 @@ public class HStore implements Store {
    */
   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
     for (ChangedReadersObserver o : this.changedReaderObservers) {
-      o.updateReaders(sfs);
+      List<KeyValueScanner> memStoreScanners;
+      this.lock.readLock().lock();
+      try {
+        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
+      } finally {
+        this.lock.readLock().unlock();
+      }
+      o.updateReaders(sfs, memStoreScanners);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index fba312f..c967071 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.CollectionUtils;
 
 /**
  * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
@@ -124,9 +125,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   // Indicates whether there was flush during the course of the scan
   private volatile boolean flushed = false;
   // generally we get one file from a flush
-  private List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+  private final List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+  // generally we get one memstroe scanner from a flush
+  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
   // The current list of scanners
-  private List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
+  private final List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
   // flush update lock
   private ReentrantLock flushLock = new ReentrantLock();
 
@@ -434,6 +437,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   public void close() {
     if (this.closing) return;
     this.closing = true;
+    clearAndClose(memStoreScannersAfterFlush);
     // Under test, we dont have a this.store
     if (this.store != null)
       this.store.deleteChangedReaderObserver(this);
@@ -741,13 +745,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return qcode;
   }
 
+  @Override
+  public long getReadPoint() {
+    return readPt;
+  }
+
+  private static void clearAndClose(List<KeyValueScanner> scanners) {
+    for (KeyValueScanner s : scanners) {
+      s.close();
+    }
+    scanners.clear();
+  }
+
   // Implementation of ChangedReadersObserver
   @Override
-  public void updateReaders(List<StoreFile> sfs) throws IOException {
-    flushed = true;
+  public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException {
+    if (CollectionUtils.isEmpty(sfs)
+      && CollectionUtils.isEmpty(memStoreScanners)) {
+      return;
+    }
     flushLock.lock();
     try {
+      flushed = true;
       flushedStoreFiles.addAll(sfs);
+      if (!CollectionUtils.isEmpty(memStoreScanners)) {
+        clearAndClose(memStoreScannersAfterFlush);
+        memStoreScannersAfterFlush.addAll(memStoreScanners);
+      }
     } finally {
       flushLock.unlock();
     }
@@ -807,12 +831,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     final boolean isCompaction = false;
     boolean usePread = get || scanUsePread;
     List<KeyValueScanner> scanners = null;
+    flushLock.lock();
     try {
-      flushLock.lock();
-      scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
-        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+      List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
+      allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
+        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
+      allScanners.addAll(memStoreScannersAfterFlush);
+      scanners = selectScannersFrom(allScanners);
       // Clear the current set of flushed store files so that they don't get added again
       flushedStoreFiles.clear();
+      memStoreScannersAfterFlush.clear();
     } finally {
       flushLock.unlock();
     }
@@ -822,7 +850,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     // remove the older memstore scanner
     for (int i = 0; i < currentScanners.size(); i++) {
       if (!currentScanners.get(i).isFileScanner()) {
-        currentScanners.remove(i);
+        currentScanners.remove(i).close();
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 07ca2b9..aa1febb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -303,13 +303,14 @@ public class TestHeapSize  {
     // DefaultMemStore Deep Overhead
     actual = DefaultMemStore.DEEP_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
-    expected += ClassSize.estimateBase(AtomicLong.class, false);
+    expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
     expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false));
     expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
     expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(CellSkipListSet.class, true);
       ClassSize.estimateBase(CellSkipListSet.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 938ecfb..f11f7cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -86,8 +86,8 @@ public class TestDefaultMemStore extends TestCase {
     byte [] other = Bytes.toBytes("somethingelse");
     KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
     this.memstore.add(samekey);
-    Cell found = this.memstore.cellSet.first();
-    assertEquals(1, this.memstore.cellSet.size());
+    Cell found = this.memstore.activeSection.getCellSkipListSet().first();
+    assertEquals(1, this.memstore.activeSection.getCellSkipListSet().size());
     assertTrue(Bytes.toString(found.getValue()), CellUtil.matchingValue(samekey, found));
   }
 
@@ -98,13 +98,13 @@ public class TestDefaultMemStore extends TestCase {
     long sizeChangeForSecondCell = this.memstore.add(kv);
     // make sure memstore size increase won't double-count MSLAB chunk size
     assertEquals(DefaultMemStore.heapSizeChange(kv, true), sizeChangeForFirstCell);
-    if (this.memstore.allocator != null) {
+    if (this.memstore.activeSection.getMemStoreLAB() != null) {
       // make sure memstore size increased when using MSLAB
       assertEquals(memstore.getCellLength(kv), sizeChangeForSecondCell);
       // make sure chunk size increased even when writing the same cell, if using MSLAB
-      if (this.memstore.allocator instanceof HeapMemStoreLAB) {
+      if (this.memstore.activeSection.getMemStoreLAB() instanceof HeapMemStoreLAB) {
         assertEquals(2 * memstore.getCellLength(kv),
-          ((HeapMemStoreLAB) this.memstore.allocator).getCurrentChunk().getNextFreeOffset());
+          ((HeapMemStoreLAB) this.memstore.activeSection.getMemStoreLAB()).getCurrentChunk().getNextFreeOffset());
       }
     } else {
       // make sure no memstore size change w/o MSLAB
@@ -492,7 +492,7 @@ public class TestDefaultMemStore extends TestCase {
     for (int i = 0; i < snapshotCount; i++) {
       addRows(this.memstore);
       runSnapshot(this.memstore);
-      assertEquals("History not being cleared", 0, this.memstore.snapshot.size());
+      assertEquals("History not being cleared", 0, this.memstore.snapshotSection.getCellSkipListSet().size());
     }
   }
 
@@ -513,7 +513,7 @@ public class TestDefaultMemStore extends TestCase {
     m.add(key2);
 
     assertTrue("Expected memstore to hold 3 values, actually has " +
-        m.cellSet.size(), m.cellSet.size() == 3);
+        m.activeSection.getCellSkipListSet().size(), m.activeSection.getCellSkipListSet().size() == 3);
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -587,12 +587,12 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(new KeyValue(row, fam ,qf3, val));
     //Creating a snapshot
     memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
     //Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
     memstore.add(new KeyValue(row, fam ,qf4, val));
     memstore.add(new KeyValue(row, fam ,qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -614,7 +614,7 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(put2);
     memstore.add(put3);
 
-    assertEquals(3, memstore.cellSet.size());
+    assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
 
     KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
     memstore.delete(del2);
@@ -625,9 +625,9 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put2);
     expected.add(put1);
 
-    assertEquals(4, memstore.cellSet.size());
+    assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
     int i = 0;
-    for(Cell cell : memstore.cellSet) {
+    for(Cell cell : memstore.activeSection.getCellSkipListSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -648,7 +648,7 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(put2);
     memstore.add(put3);
 
-    assertEquals(3, memstore.cellSet.size());
+    assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
 
     KeyValue del2 =
       new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@@ -661,9 +661,9 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put1);
 
 
-    assertEquals(4, memstore.cellSet.size());
+    assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
     int i = 0;
-    for (Cell cell: memstore.cellSet) {
+    for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -701,9 +701,9 @@ public class TestDefaultMemStore extends TestCase {
 
 
 
-    assertEquals(5, memstore.cellSet.size());
+    assertEquals(5, memstore.activeSection.getCellSkipListSet().size());
     int i = 0;
-    for (Cell cell: memstore.cellSet) {
+    for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
@@ -717,8 +717,8 @@ public class TestDefaultMemStore extends TestCase {
     memstore.add(new KeyValue(row, fam, qf, ts, val));
     KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
     memstore.delete(delete);
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+    assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
   }
 
   public void testRetainsDeleteVersion() throws IOException {
@@ -730,8 +730,8 @@ public class TestDefaultMemStore extends TestCase {
         "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+    assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
   }
   public void testRetainsDeleteColumn() throws IOException {
     // add a put to memstore
@@ -742,8 +742,8 @@ public class TestDefaultMemStore extends TestCase {
         KeyValue.Type.DeleteColumn, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+    assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
   }
   public void testRetainsDeleteFamily() throws IOException {
     // add a put to memstore
@@ -754,8 +754,8 @@ public class TestDefaultMemStore extends TestCase {
         KeyValue.Type.DeleteFamily, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(2, memstore.cellSet.size());
-    assertEquals(delete, memstore.cellSet.first());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+    assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
   }
 
   ////////////////////////////////////
@@ -855,7 +855,7 @@ public class TestDefaultMemStore extends TestCase {
   public void testUpsertMemstoreSize() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR);
-    long oldSize = memstore.size.get();
+    long oldSize = memstore.activeSection.getHeapSize().get();
 
     List<Cell> l = new ArrayList<Cell>();
     KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -866,18 +866,18 @@ public class TestDefaultMemStore extends TestCase {
     l.add(kv1); l.add(kv2); l.add(kv3);
 
     this.memstore.upsert(l, 2);// readpoint is 2
-    long newSize = this.memstore.size.get();
+    long newSize = this.memstore.activeSection.getHeapSize().get();
     assert(newSize > oldSize);
     //The kv1 should be removed.
-    assert(memstore.cellSet.size() == 2);
+    assert(memstore.activeSection.getCellSkipListSet().size() == 2);
     
     KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
     kv4.setSequenceId(1);
     l.clear(); l.add(kv4);
     this.memstore.upsert(l, 3);
-    assertEquals(newSize, this.memstore.size.get());
+    assertEquals(newSize, this.memstore.activeSection.getHeapSize().get());
     //The kv2 should be removed.
-    assert(memstore.cellSet.size() == 2);
+    assert(memstore.activeSection.getCellSkipListSet().size() == 2);
     //this.memstore = null;
   }
 
@@ -1038,10 +1038,10 @@ public class TestDefaultMemStore extends TestCase {
 
   private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
     // Save off old state.
-    int oldHistorySize = hmc.snapshot.size();
+    int oldHistorySize = hmc.snapshotSection.getCellSkipListSet().size();
     MemStoreSnapshot snapshot = hmc.snapshot();
     // Make some assertions about what just happened.
-    assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size());
+    assertTrue("History size has not increased", oldHistorySize < hmc.snapshotSection.getCellSkipListSet().size());
     long t = memstore.timeOfOldestEdit();
     assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
     hmc.clearSnapshot(snapshot.getId());

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 3c0e147..2b0a768 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -2480,10 +2480,10 @@ public class TestHRegion {
       // This is kinda hacky, but better than nothing...
       long now = System.currentTimeMillis();
       DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
-      Cell firstCell = memstore.cellSet.first();
+      Cell firstCell = memstore.activeSection.getCellSkipListSet().first();
       assertTrue(firstCell.getTimestamp() <= now);
       now = firstCell.getTimestamp();
-      for (Cell cell : memstore.cellSet) {
+      for (Cell cell : memstore.activeSection.getCellSkipListSet()) {
         assertTrue(cell.getTimestamp() <= now);
         now = cell.getTimestamp();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 4f8287c..ae10d05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -115,13 +115,13 @@ public class TestMemStoreChunkPool {
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
 
     // Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
     memstore.add(new KeyValue(row, fam, qf4, val));
     memstore.add(new KeyValue(row, fam, qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
     memstore.clearSnapshot(snapshot.getId());
 
     int chunkCount = chunkPool.getPoolSize();
@@ -152,13 +152,13 @@ public class TestMemStoreChunkPool {
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.snapshot.size());
+    assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
 
     // Adding value to "new" memstore
-    assertEquals(0, memstore.cellSet.size());
+    assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
     memstore.add(new KeyValue(row, fam, qf4, val));
     memstore.add(new KeyValue(row, fam, qf5, val));
-    assertEquals(2, memstore.cellSet.size());
+    assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
 
     // opening scanner before clear the snapshot
     List<KeyValueScanner> scanners = memstore.getScanners(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 414c663..4cebf1e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -19,24 +19,31 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,9 +64,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -70,15 +77,16 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.util.Progressable;
 import org.junit.After;
 import org.junit.Assert;
@@ -162,9 +170,14 @@ public class TestStore {
     init(methodName, conf, htd, hcd);
   }
 
-  @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
       HColumnDescriptor hcd) throws IOException {
+    return init(methodName, conf, htd, hcd, null);
+  }
+
+  @SuppressWarnings("deprecation")
+  private Store init(String methodName, Configuration conf, HTableDescriptor htd,
+      HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -185,8 +198,11 @@ public class TestStore {
     final WALFactory wals = new WALFactory(walConf, null, methodName);
     HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
             info.getTable().getNamespace()), fs, conf, info, htd, null);
-
-    store = new HStore(region, hcd, conf);
+    if (hook == null) {
+      store = new HStore(region, hcd, conf);
+    } else {
+      store = new MyStore(region, hcd, conf, hook);
+    }
     return store;
   }
 
@@ -552,7 +568,7 @@ public class TestStore {
     this.store.snapshot();
     flushStore(store, id++);
     Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
-    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
   }
 
   private void assertCheck() {
@@ -597,7 +613,7 @@ public class TestStore {
     flushStore(store, id++);
     Assert.assertEquals(1, this.store.getStorefiles().size());
     // from the one we inserted up there, and a new one
-    Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
 
     // how many key/values for this row are there?
     Get get = new Get(row);
@@ -666,7 +682,7 @@ public class TestStore {
     }
 
     long computedSize=0;
-    for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
+    for (Cell cell : ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet()) {
       long kvsize = DefaultMemStore.heapSizeChange(cell, true);
       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
       computedSize += kvsize;
@@ -698,7 +714,7 @@ public class TestStore {
     // then flush.
     flushStore(store, id++);
     Assert.assertEquals(1, this.store.getStorefiles().size());
-    Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
+    Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
 
     // now increment again:
     newValue += 1;
@@ -1112,4 +1128,113 @@ public class TestStore {
     //ensure that replaceStoreFiles is not called if files are not refreshed
     verify(spiedStore, times(0)).replaceStoreFiles(null, null);
   }
-}
+
+  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
+    Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
+    CellUtil.setSequenceId(c, sequenceId);
+    return c;
+  }
+
+  @Test
+  public void testScanWithDoubleFlush() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    // Initialize region
+    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
+      @Override
+      public void hook(final MyStore store) throws IOException {
+        final long tmpId = id++;
+        ExecutorService s = Executors.newSingleThreadExecutor();
+        s.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              // flush the store before storescanner updates the scanners from store.
+              // The current data will be flushed into files and the memstore will
+              // be clear.
+              // -- phase (4/4)
+              flushStore(store, tmpId);
+            } catch (IOException ex) {
+              throw new RuntimeException(ex);
+            }
+          }
+        });
+        s.shutdown();
+        try {
+          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
+          s.awaitTermination(500, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+        }
+      }
+    });
+    byte[] oldValue = Bytes.toBytes("oldValue");
+    byte[] currentValue = Bytes.toBytes("currentValue");
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    // older data whihc shouldn't be "seen" by client
+    myStore.add(createCell(qf1, ts, seqId, oldValue));
+    myStore.add(createCell(qf2, ts, seqId, oldValue));
+    myStore.add(createCell(qf3, ts, seqId, oldValue));
+    long snapshotId = id++;
+    // push older data into snapshot -- phase (1/4)
+    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
+    storeFlushCtx.prepare();
+
+    // insert current data into active -- phase (2/4)
+    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue));
+    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue));
+    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue));
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+    quals.add(qf3);
+    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
+            new Scan(new Get(row)), quals, seqId + 1)) {
+      // complete the flush -- phase (3/4)
+      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+
+      List<Cell> results = new ArrayList<>();
+      scanner.next(results);
+      assertEquals(3, results.size());
+      for (Cell c : results) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+                + ", actual:" + Bytes.toStringBinary(actualValue),
+                 Bytes.equals(actualValue, currentValue));
+      }
+    }
+
+  }
+
+  private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMaxVersions(5);
+    return (MyStore) init(methodName, conf, htd, hcd, hook);
+  }
+
+  private static class MyStore extends HStore {
+
+    private final MyScannerHook hook;
+
+    MyStore(final HRegion region, final HColumnDescriptor family,
+            final Configuration confParam, MyScannerHook hook) throws IOException {
+      super(region, family, confParam);
+      this.hook = hook;
+    }
+
+    @Override
+    public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+            boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+            byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
+      hook.hook(this);
+      return super.getScanners(files, cacheBlocks, isGet, usePread,
+              isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner);
+    }
+  }
+
+  private interface MyScannerHook {
+
+    void hook(MyStore store) throws IOException;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index ba4ad3c..00dd9e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -452,9 +453,9 @@ public class TestStoreScanner extends TestCase {
     // normally cause an NPE because scan.store is null.  So as long as we get through these
     // two calls we are good and the bug was quashed.
 
-    scan.updateReaders(new ArrayList<StoreFile>());
+    scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
 
-    scan.updateReaders(new ArrayList<StoreFile>());
+    scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
 
     scan.peek();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index 7e86632..e379e85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
@@ -125,7 +126,7 @@ public class TestWideScanner extends HBaseTestCase {
           ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
         while (scanners.hasNext()) {
           StoreScanner ss = (StoreScanner)scanners.next();
-          ss.updateReaders(new ArrayList<StoreFile>());
+          ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
         }
       } while (more);