You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/14 15:07:10 UTC

svn commit: r1577541 [1/3] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/io/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: anoopsamjohn
Date: Fri Mar 14 14:07:10 2014
New Revision: 1577541

URL: http://svn.apache.org/r1577541
Log:
HBASE-10648 Pluggable Memstore.(Anoop)

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
Removed:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java?rev=1577541&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java Fri Mar 14 14:07:10 2014
@@ -0,0 +1,1070 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.rmi.UnexpectedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The MemStore holds in-memory modifications to the Store.  Modifications
+ * are {@link Cell}s.  When asked to flush, current memstore is moved
+ * to snapshot and is cleared.  We continue to serve edits out of new memstore
+ * and backing snapshot until flusher reports in that the flush succeeded. At
+ * this point we let the snapshot go.
+ *  <p>
+ * The MemStore functions should not be called in parallel. Callers should hold
+ *  write and read locks. This is done in {@link HStore}.
+ *  </p>
+ *
+ * TODO: Adjust size of the memstore when we remove items because they have
+ * been deleted.
+ * TODO: With new KVSLS, need to make sure we update HeapSize with difference
+ * in KV size.
+ */
+@InterfaceAudience.Private
+public class DefaultMemStore implements MemStore {
+  private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
+
+  static final String USEMSLAB_KEY =
+    "hbase.hregion.memstore.mslab.enabled";
+  private static final boolean USEMSLAB_DEFAULT = true;
+
+  private Configuration conf;
+
+  // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
+  // better semantics.  The Map will overwrite if passed a key it already had
+  // whereas the Set will not add new KV if key is same though value might be
+  // different.  Value is not important -- just make sure always same
+  // reference passed.
+  volatile KeyValueSkipListSet kvset;
+
+  // Snapshot of memstore.  Made for flusher.
+  volatile KeyValueSkipListSet snapshot;
+
+  final KeyValue.KVComparator comparator;
+
+  // Used to track own heapSize
+  final AtomicLong size;
+  private volatile long snapshotSize;
+
+  // Used to track when to flush
+  volatile long timeOfOldestEdit = Long.MAX_VALUE;
+
+  TimeRangeTracker timeRangeTracker;
+  TimeRangeTracker snapshotTimeRangeTracker;
+
+  MemStoreChunkPool chunkPool;
+  volatile MemStoreLAB allocator;
+  volatile MemStoreLAB snapshotAllocator;
+  volatile long snapshotId;
+
+  /**
+   * Default constructor. Used for tests.
+   */
+  public DefaultMemStore() {
+    this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
+  }
+
+  /**
+   * Constructor.
+   * @param c Comparator
+   */
+  public DefaultMemStore(final Configuration conf,
+                  final KeyValue.KVComparator c) {
+    this.conf = conf;
+    this.comparator = c;
+    this.kvset = new KeyValueSkipListSet(c);
+    this.snapshot = new KeyValueSkipListSet(c);
+    timeRangeTracker = new TimeRangeTracker();
+    snapshotTimeRangeTracker = new TimeRangeTracker();
+    this.size = new AtomicLong(DEEP_OVERHEAD);
+    this.snapshotSize = 0;
+    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+      this.chunkPool = MemStoreChunkPool.getPool(conf);
+      this.allocator = new MemStoreLAB(conf, chunkPool);
+    } else {
+      this.allocator = null;
+      this.chunkPool = null;
+    }
+  }
+
+  void dump() {
+    for (KeyValue kv: this.kvset) {
+      LOG.info(kv);
+    }
+    for (KeyValue kv: this.snapshot) {
+      LOG.info(kv);
+    }
+  }
+
+  /**
+   * Creates a snapshot of the current memstore.
+   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
+   */
+  @Override
+  public MemStoreSnapshot snapshot() {
+    // If snapshot currently has entries, then flusher failed or didn't call
+    // cleanup.  Log a warning.
+    if (!this.snapshot.isEmpty()) {
+      LOG.warn("Snapshot called again without clearing previous. " +
+          "Doing nothing. Another ongoing flush or did we fail last attempt?");
+    } else {
+      this.snapshotId = EnvironmentEdgeManager.currentTimeMillis();
+      this.snapshotSize = keySize();
+      if (!this.kvset.isEmpty()) {
+        this.snapshot = this.kvset;
+        this.kvset = new KeyValueSkipListSet(this.comparator);
+        this.snapshotTimeRangeTracker = this.timeRangeTracker;
+        this.timeRangeTracker = new TimeRangeTracker();
+        // Reset heap to not include any keys
+        this.size.set(DEEP_OVERHEAD);
+        this.snapshotAllocator = this.allocator;
+        // Reset allocator so we get a fresh buffer for the new memstore
+        if (allocator != null) {
+          this.allocator = new MemStoreLAB(conf, chunkPool);
+        }
+        timeOfOldestEdit = Long.MAX_VALUE;
+      }
+    }
+    return new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
+        this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator));
+  }
+
+  /**
+   * The passed snapshot was successfully persisted; it can be let go.
+   * @param id Id of the snapshot to clean out.
+   * @throws UnexpectedException
+   * @see #snapshot()
+   */
+  @Override
+  public void clearSnapshot(long id) throws UnexpectedException {
+    MemStoreLAB tmpAllocator = null;
+    if (this.snapshotId != id) {
+      throw new UnexpectedException("Current snapshot id is " + this.snapshotId + ",passed " + id);
+    }
+    // OK. Passed in snapshot is same as current snapshot. If not-empty,
+    // create a new snapshot and let the old one go.
+    if (!this.snapshot.isEmpty()) {
+      this.snapshot = new KeyValueSkipListSet(this.comparator);
+      this.snapshotTimeRangeTracker = new TimeRangeTracker();
+    }
+    this.snapshotSize = 0;
+    this.snapshotId = -1;
+    if (this.snapshotAllocator != null) {
+      tmpAllocator = this.snapshotAllocator;
+      this.snapshotAllocator = null;
+    }
+    if (tmpAllocator != null) {
+      tmpAllocator.close();
+    }
+  }
+
+  @Override
+  public long getFlushableSize() {
+    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+  }
+
+  /**
+   * Write an update
+   * @param cell
+   * @return approximate size of the passed key and value.
+   */
+  @Override
+  public long add(Cell cell) {
+    KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell));
+    return internalAdd(toAdd);
+  }
+
+  @Override
+  public long timeOfOldestEdit() {
+    return timeOfOldestEdit;
+  }
+
+  private boolean addToKVSet(KeyValue e) {
+    boolean b = this.kvset.add(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  private boolean removeFromKVSet(KeyValue e) {
+    boolean b = this.kvset.remove(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  void setOldestEditTimeToNow() {
+    if (timeOfOldestEdit == Long.MAX_VALUE) {
+      timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
+    }
+  }
+
+  /**
+   * Internal version of add() that doesn't clone KVs with the
+   * allocator, and doesn't take the lock.
+   *
+   * Callers should ensure they already have the read lock taken
+   */
+  private long internalAdd(final KeyValue toAdd) {
+    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
+    timeRangeTracker.includeTimestamp(toAdd);
+    this.size.addAndGet(s);
+    return s;
+  }
+
+  private KeyValue maybeCloneWithAllocator(KeyValue kv) {
+    if (allocator == null) {
+      return kv;
+    }
+
+    int len = kv.getLength();
+    Allocation alloc = allocator.allocateBytes(len);
+    if (alloc == null) {
+      // The allocation was too large, allocator decided
+      // not to do anything with it.
+      return kv;
+    }
+    assert alloc.getData() != null;
+    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
+    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
+    newKv.setMvccVersion(kv.getMvccVersion());
+    return newKv;
+  }
+
+  /**
+   * Remove n key from the memstore. Only kvs that have the same key and the
+   * same memstoreTS are removed.  It is ok to not update timeRangeTracker
+   * in this call. It is possible that we can optimize this method by using
+   * tailMap/iterator, but since this method is called rarely (only for
+   * error recovery), we can leave those optimization for the future.
+   * @param cell
+   */
+  @Override
+  public void rollback(Cell cell) {
+    // If the key is in the snapshot, delete it. We should not update
+    // this.size, because that tracks the size of only the memstore and
+    // not the snapshot. The flush of this snapshot to disk has not
+    // yet started because Store.flush() waits for all rwcc transactions to
+    // commit before starting the flush to disk.
+    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+    KeyValue found = this.snapshot.get(kv);
+    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+      this.snapshot.remove(kv);
+    }
+    // If the key is in the memstore, delete it. Update this.size.
+    found = this.kvset.get(kv);
+    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+      removeFromKVSet(kv);
+      long s = heapSizeChange(kv, true);
+      this.size.addAndGet(-s);
+    }
+  }
+
+  /**
+   * Write a delete
+   * @param deleteCell
+   * @return approximate size of the passed key and value.
+   */
+  @Override
+  public long delete(Cell deleteCell) {
+    long s = 0;
+    KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(deleteCell));
+    s += heapSizeChange(toAdd, addToKVSet(toAdd));
+    timeRangeTracker.includeTimestamp(toAdd);
+    this.size.addAndGet(s);
+    return s;
+  }
+
+  /**
+   * @param kv Find the row that comes after this one.  If null, we return the
+   * first.
+   * @return Next row or null if none found.
+   */
+  KeyValue getNextRow(final KeyValue kv) {
+    return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
+  }
+
+  /*
+   * @param a
+   * @param b
+   * @return Return lowest of a or b or null if both a and b are null
+   */
+  private KeyValue getLowest(final KeyValue a, final KeyValue b) {
+    if (a == null) {
+      return b;
+    }
+    if (b == null) {
+      return a;
+    }
+    return comparator.compareRows(a, b) <= 0? a: b;
+  }
+
+  /*
+   * @param key Find row that follows this one.  If null, return first.
+   * @param map Set to look in for a row beyond <code>row</code>.
+   * @return Next row or null if none found.  If one found, will be a new
+   * KeyValue -- can be destroyed by subsequent calls to this method.
+   */
+  private KeyValue getNextRow(final KeyValue key,
+      final NavigableSet<KeyValue> set) {
+    KeyValue result = null;
+    SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
+    // Iterate until we fall into the next row; i.e. move off current row
+    for (KeyValue kv: tail) {
+      if (comparator.compareRows(kv, key) <= 0)
+        continue;
+      // Note: Not suppressing deletes or expired cells.  Needs to be handled
+      // by higher up functions.
+      result = kv;
+      break;
+    }
+    return result;
+  }
+
+  /**
+   * @param state column/delete tracking state
+   */
+  @Override
+  public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
+    getRowKeyAtOrBefore(kvset, state);
+    getRowKeyAtOrBefore(snapshot, state);
+  }
+
+  /*
+   * @param set
+   * @param state Accumulates deletes and candidates.
+   */
+  private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
+      final GetClosestRowBeforeTracker state) {
+    if (set.isEmpty()) {
+      return;
+    }
+    if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
+      // Found nothing in row.  Try backing up.
+      getRowKeyBefore(set, state);
+    }
+  }
+
+  /*
+   * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
+   * we have been passed the first possible key on a row.  As we walk forward
+   * we accumulate deletes until we hit a candidate on the row at which point
+   * we return.
+   * @param set
+   * @param firstOnRow First possible key on this row.
+   * @param state
+   * @return True if we found a candidate walking this row.
+   */
+  private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
+      final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
+    boolean foundCandidate = false;
+    SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
+    if (tail.isEmpty()) return foundCandidate;
+    for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
+      KeyValue kv = i.next();
+      // Did we go beyond the target row? If so break.
+      if (state.isTooFar(kv, firstOnRow)) break;
+      if (state.isExpired(kv)) {
+        i.remove();
+        continue;
+      }
+      // If we added something, this row is a contender. break.
+      if (state.handle(kv)) {
+        foundCandidate = true;
+        break;
+      }
+    }
+    return foundCandidate;
+  }
+
+  /*
+   * Walk backwards through the passed set a row at a time until we run out of
+   * set or until we get a candidate.
+   * @param set
+   * @param state
+   */
+  private void getRowKeyBefore(NavigableSet<KeyValue> set,
+      final GetClosestRowBeforeTracker state) {
+    KeyValue firstOnRow = state.getTargetKey();
+    for (Member p = memberOfPreviousRow(set, state, firstOnRow);
+        p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
+      // Make sure we don't fall out of our table.
+      if (!state.isTargetTable(p.kv)) break;
+      // Stop looking if we've exited the better candidate range.
+      if (!state.isBetterCandidate(p.kv)) break;
+      // Make into firstOnRow
+      firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
+          HConstants.LATEST_TIMESTAMP);
+      // If we find something, break;
+      if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
+    }
+  }
+
+  /**
+   * Only used by tests. TODO: Remove
+   *
+   * Given the specs of a column, update it, first by inserting a new record,
+   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
+   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+   * get the new value, or the old value and all readers will eventually only see the new
+   * value after the old was removed.
+   *
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param newValue
+   * @param now
+   * @return  Timestamp
+   */
+  public long updateColumnValue(byte[] row,
+                                byte[] family,
+                                byte[] qualifier,
+                                long newValue,
+                                long now) {
+    KeyValue firstKv = KeyValue.createFirstOnRow(
+        row, family, qualifier);
+    // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+    SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+    if (!snSs.isEmpty()) {
+      KeyValue snKv = snSs.first();
+      // is there a matching KV in the snapshot?
+      if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+        if (snKv.getTimestamp() == now) {
+          // poop,
+          now += 1;
+        }
+      }
+    }
+
+    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+    // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+    // so we cant add the new KV w/o knowing what's there already, but we also
+    // want to take this chance to delete some kvs. So two loops (sad)
+
+    SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+    for (KeyValue kv : ss) {
+      // if this isnt the row we are interested in, then bail:
+      if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
+        break; // rows dont match, bail.
+      }
+
+      // if the qualifier matches and it's a put, just RM it out of the kvset.
+      if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
+          kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
+        now = kv.getTimestamp();
+      }
+    }
+
+    // create or update (upsert) a new KeyValue with
+    // 'now' and a 0 memstoreTS == immediately visible
+    List<Cell> cells = new ArrayList<Cell>(1);
+    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+    return upsert(cells, 1L);
+  }
+
+  /**
+   * Update or insert the specified KeyValues.
+   * <p>
+   * For each KeyValue, insert into MemStore.  This will atomically upsert the
+   * value for that row/family/qualifier.  If a KeyValue did already exist,
+   * it will then be removed.
+   * <p>
+   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+   * be immediately visible.  May want to change this so it is atomic across
+   * all KeyValues.
+   * <p>
+   * This is called under row lock, so Get operations will still see updates
+   * atomically.  Scans will only see each KeyValue update as atomic.
+   *
+   * @param cells
+   * @param readpoint readpoint below which we can safely remove duplicate KVs 
+   * @return change in memstore size
+   */
+  @Override
+  public long upsert(Iterable<Cell> cells, long readpoint) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += upsert(cell, readpoint);
+    }
+    return size;
+  }
+
+  /**
+   * Inserts the specified KeyValue into MemStore and deletes any existing
+   * versions of the same row/family/qualifier as the specified KeyValue.
+   * <p>
+   * First, the specified KeyValue is inserted into the Memstore.
+   * <p>
+   * If there are any existing KeyValues in this MemStore with the same row,
+   * family, and qualifier, they are removed.
+   * <p>
+   * Callers must hold the read lock.
+   *
+   * @param cell
+   * @return change in size of MemStore
+   */
+  private long upsert(Cell cell, long readpoint) {
+    // Add the KeyValue to the MemStore
+    // Use the internalAdd method here since we (a) already have a lock
+    // and (b) cannot safely use the MSLAB here without potentially
+    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+    // test that triggers the pathological case if we don't avoid MSLAB
+    // here.
+    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+    long addedSize = internalAdd(kv);
+
+    // Get the KeyValues for the row/family/qualifier regardless of timestamp.
+    // For this case we want to clean up any other puts
+    KeyValue firstKv = KeyValue.createFirstOnRow(
+        kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
+        kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
+    SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+    Iterator<KeyValue> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
+    while ( it.hasNext() ) {
+      KeyValue cur = it.next();
+
+      if (kv == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // check that this is the row and column we are interested in, otherwise bail
+      if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+            cur.getMvccVersion() <= readpoint) {
+          if (versionsVisible > 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            long delta = heapSizeChange(cur, true);
+            addedSize -= delta;
+            this.size.addAndGet(-delta);
+            it.remove();
+            setOldestEditTimeToNow();
+          } else {
+            versionsVisible++;
+          }
+        }
+      } else {
+        // past the row or column, done
+        break;
+      }
+    }
+    return addedSize;
+  }
+
+  /*
+   * Immutable data structure to hold member found in set and the set it was
+   * found in.  Include set because it is carrying context.
+   */
+  private static class Member {
+    final KeyValue kv;
+    final NavigableSet<KeyValue> set;
+    Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
+      this.kv = kv;
+      this.set = s;
+    }
+  }
+
+  /*
+   * @param set Set to walk back in.  Pass a first in row or we'll return
+   * same row (loop).
+   * @param state Utility and context.
+   * @param firstOnRow First item on the row after the one we want to find a
+   * member in.
+   * @return Null or member of row previous to <code>firstOnRow</code>
+   */
+  private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
+      final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
+    NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
+    if (head.isEmpty()) return null;
+    for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
+      KeyValue found = i.next();
+      if (state.isExpired(found)) {
+        i.remove();
+        continue;
+      }
+      return new Member(head, found);
+    }
+    return null;
+  }
+
+  /**
+   * @return scanner on memstore and snapshot in this order.
+   */
+  @Override
+  public List<KeyValueScanner> getScanners(long readPt) {
+    return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
+  }
+
+  /**
+   * Check if this memstore may contain the required keys
+   * @param scan
+   * @return False if the key definitely does not exist in this Memstore
+   */
+  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
+    return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+        snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
+        && (Math.max(timeRangeTracker.getMaximumTimestamp(),
+                     snapshotTimeRangeTracker.getMaximumTimestamp()) >=
+            oldestUnexpiredTS);
+  }
+
+  /*
+   * MemStoreScanner implements the KeyValueScanner.
+   * It lets the caller scan the contents of a memstore -- both current
+   * map and snapshot.
+   * This behaves as if it were a real scanner but does not maintain position.
+   */
+  protected class MemStoreScanner extends NonLazyKeyValueScanner {
+    // Next row information for either kvset or snapshot
+    private KeyValue kvsetNextRow = null;
+    private KeyValue snapshotNextRow = null;
+
+    // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+    private KeyValue kvsetItRow = null;
+    private KeyValue snapshotItRow = null;
+    
+    // iterator based scanning.
+    private Iterator<KeyValue> kvsetIt;
+    private Iterator<KeyValue> snapshotIt;
+
+    // The kvset and snapshot at the time of creating this scanner
+    private KeyValueSkipListSet kvsetAtCreation;
+    private KeyValueSkipListSet snapshotAtCreation;
+
+    // the pre-calculated KeyValue to be returned by peek() or next()
+    private KeyValue theNext;
+
+    // The allocator and snapshot allocator at the time of creating this scanner
+    volatile MemStoreLAB allocatorAtCreation;
+    volatile MemStoreLAB snapshotAllocatorAtCreation;
+    
+    // A flag represents whether could stop skipping KeyValues for MVCC
+    // if have encountered the next row. Only used for reversed scan
+    private boolean stopSkippingKVsIfNextRow = false;
+
+    private long readPoint;
+
+    /*
+    Some notes...
+
+     So memstorescanner is fixed at creation time. this includes pointers/iterators into
+    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
+    snapshot is moved.  since kvset is null there is no point on reseeking on both,
+      we can save us the trouble. During the snapshot->hfile transition, the memstore
+      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
+      potentially do something smarter by adjusting the existing memstore scanner.
+
+      But there is a greater problem here, that being once a scanner has progressed
+      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+      if a scan lasts a little while, there is a chance for new entries in kvset to
+      become available but we will never see them.  This needs to be handled at the
+      StoreScanner level with coordination with MemStoreScanner.
+
+      Currently, this problem is only partly managed: during the small amount of time
+      when the StoreScanner has not yet created a new MemStoreScanner, we will miss
+      the adds to kvset in the MemStoreScanner.
+    */
+
+    MemStoreScanner(long readPoint) {
+      super();
+
+      this.readPoint = readPoint;
+      kvsetAtCreation = kvset;
+      snapshotAtCreation = snapshot;
+      if (allocator != null) {
+        this.allocatorAtCreation = allocator;
+        this.allocatorAtCreation.incScannerCount();
+      }
+      if (snapshotAllocator != null) {
+        this.snapshotAllocatorAtCreation = snapshotAllocator;
+        this.snapshotAllocatorAtCreation.incScannerCount();
+      }
+    }
+
+    private KeyValue getNext(Iterator<KeyValue> it) {
+      KeyValue startKV = theNext;
+      KeyValue v = null;
+      try {
+        while (it.hasNext()) {
+          v = it.next();
+          if (v.getMvccVersion() <= this.readPoint) {
+            return v;
+          }
+          if (stopSkippingKVsIfNextRow && startKV != null
+              && comparator.compareRows(v, startKV) > 0) {
+            return null;
+          }
+        }
+
+        return null;
+      } finally {
+        if (v != null) {
+          // in all cases, remember the last KV iterated to
+          if (it == snapshotIt) {
+            snapshotItRow = v;
+          } else {
+            kvsetItRow = v;
+          }
+        }
+      }
+    }
+
+    /**
+     *  Set the scanner at the seek key.
+     *  Must be called only once: there is no thread safety between the scanner
+     *   and the memStore.
+     * @param key seek value
+     * @return false if the key is null or if there is no data
+     */
+    @Override
+    public synchronized boolean seek(KeyValue key) {
+      if (key == null) {
+        close();
+        return false;
+      }
+
+      // kvset and snapshot will never be null.
+      // if tailSet can't find anything, SortedSet is empty (not null).
+      kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+      kvsetItRow = null;
+      snapshotItRow = null;
+
+      return seekInSubLists(key);
+    }
+
+
+    /**
+     * (Re)initialize the iterators after a seek or a reseek.
+     */
+    private synchronized boolean seekInSubLists(KeyValue key){
+      kvsetNextRow = getNext(kvsetIt);
+      snapshotNextRow = getNext(snapshotIt);
+
+      // Calculate the next value
+      theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+      // has data
+      return (theNext != null);
+    }
+
+
+    /**
+     * Move forward on the sub-lists set previously by seek.
+     * @param key seek value (should be non-null)
+     * @return true if there is at least one KV to read, false otherwise
+     */
+    @Override
+    public synchronized boolean reseek(KeyValue key) {
+      /*
+      See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+      This code is executed concurrently with flush and puts, without locks.
+      Two points must be known when working on this code:
+      1) It's not possible to use the 'kvTail' and 'snapshot'
+       variables, as they are modified during a flush.
+      2) The ideal implementation for performance would use the sub skip list
+       implicitly pointed by the iterators 'kvsetIt' and
+       'snapshotIt'. Unfortunately the Java API does not offer a method to
+       get it. So we remember the last keys we iterated to and restore
+       the reseeked set to at least that point.
+       */
+
+      kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
+
+      return seekInSubLists(key);
+    }
+
+
+    @Override
+    public synchronized KeyValue peek() {
+      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+      return theNext;
+    }
+
+    @Override
+    public synchronized KeyValue next() {
+      if (theNext == null) {
+          return null;
+      }
+
+      final KeyValue ret = theNext;
+
+      // Advance one of the iterators
+      if (theNext == kvsetNextRow) {
+        kvsetNextRow = getNext(kvsetIt);
+      } else {
+        snapshotNextRow = getNext(snapshotIt);
+      }
+
+      // Calculate the next value
+      theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+      //    getLowest() + " threadpoint=" + readpoint);
+      return ret;
+    }
+
+    /*
+     * Returns the lower of the two key values, or null if they are both null.
+     * This uses comparator.compare() to compare the KeyValue using the memstore
+     * comparator.
+     */
+    private KeyValue getLowest(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare <= 0 ? first : second);
+      }
+      return (first != null ? first : second);
+    }
+
+    /*
+     * Returns the higher of the two key values, or null if they are both null.
+     * This uses comparator.compare() to compare the KeyValue using the memstore
+     * comparator.
+     */
+    private KeyValue getHighest(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare > 0 ? first : second);
+      }
+      return (first != null ? first : second);
+    }
+
+    public synchronized void close() {
+      this.kvsetNextRow = null;
+      this.snapshotNextRow = null;
+
+      this.kvsetIt = null;
+      this.snapshotIt = null;
+      
+      if (allocatorAtCreation != null) {
+        this.allocatorAtCreation.decScannerCount();
+        this.allocatorAtCreation = null;
+      }
+      if (snapshotAllocatorAtCreation != null) {
+        this.snapshotAllocatorAtCreation.decScannerCount();
+        this.snapshotAllocatorAtCreation = null;
+      }
+
+      this.kvsetItRow = null;
+      this.snapshotItRow = null;
+    }
+
+    /**
+     * MemStoreScanner returns max value as sequence id because it will
+     * always have the latest data among all files.
+     */
+    @Override
+    public long getSequenceID() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
+        long oldestUnexpiredTS) {
+      return shouldSeek(scan, oldestUnexpiredTS);
+    }
+
+    /**
+     * Seek scanner to the given key first. If it returns false(means
+     * peek()==null) or scanner's peek row is bigger than row of given key, seek
+     * the scanner to the previous row of given key
+     */
+    @Override
+    public synchronized boolean backwardSeek(KeyValue key) {
+      seek(key);
+      if (peek() == null || comparator.compareRows(peek(), key) > 0) {
+        return seekToPreviousRow(key);
+      }
+      return true;
+    }
+
+    /**
+     * Separately get the KeyValue before the specified key from kvset and
+     * snapshotset, and use the row of higher one as the previous row of
+     * specified key, then seek to the first KeyValue of previous row
+     */
+    @Override
+    public synchronized boolean seekToPreviousRow(KeyValue key) {
+      KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
+      SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
+      KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
+      SortedSet<KeyValue> snapshotHead = snapshotAtCreation
+          .headSet(firstKeyOnRow);
+      KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
+          .last();
+      KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
+      if (lastKVBeforeRow == null) {
+        theNext = null;
+        return false;
+      }
+      KeyValue firstKeyOnPreviousRow = KeyValue
+          .createFirstOnRow(lastKVBeforeRow.getRow());
+      this.stopSkippingKVsIfNextRow = true;
+      seek(firstKeyOnPreviousRow);
+      this.stopSkippingKVsIfNextRow = false;
+      if (peek() == null
+          || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
+        return seekToPreviousRow(lastKVBeforeRow);
+      }
+      return true;
+    }
+
+    @Override
+    public synchronized boolean seekToLastRow() {
+      KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
+          .last();
+      KeyValue second = snapshotAtCreation.isEmpty() ? null
+          : snapshotAtCreation.last();
+      KeyValue higherKv = getHighest(first, second);
+      if (higherKv == null) {
+        return false;
+      }
+      KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
+      if (seek(firstKvOnLastRow)) {
+        return true;
+      } else {
+        return seekToPreviousRow(higherKv);
+      }
+
+    }
+  }
+
+  public final static long FIXED_OVERHEAD = ClassSize.align(
+      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
+
+  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+      ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
+      (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
+
+  /*
+   * Calculate how the MemStore size has changed.  Includes overhead of the
+   * backing Map.
+   * @param kv
+   * @param notpresent True if the kv was NOT present in the set.
+   * @return Size
+   */
+  static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
+    return notpresent ?
+        ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
+        0;
+  }
+
+  private long keySize() {
+    return heapSize() - DEEP_OVERHEAD;
+  }
+
+  /**
+   * Get the entire heap usage for this MemStore not including keys in the
+   * snapshot.
+   */
+  @Override
+  public long heapSize() {
+    return size.get();
+  }
+
+  @Override
+  public long size() {
+    return heapSize();
+  }
+ 
+  /**
+   * Code to help figure if our approximation of object heap sizes is close
+   * enough.  See hbase-900.  Fills memstores then waits so user can heap
+   * dump and bring up resultant hprof in something like jprofiler which
+   * allows you get 'deep size' on objects.
+   * @param args main args
+   */
+  public static void main(String [] args) {
+    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
+      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
+    LOG.info("vmInputArguments=" + runtime.getInputArguments());
+    DefaultMemStore memstore1 = new DefaultMemStore();
+    // TODO: x32 vs x64
+    long size = 0;
+    final int count = 10000;
+    byte [] fam = Bytes.toBytes("col");
+    byte [] qf = Bytes.toBytes("umn");
+    byte [] empty = new byte[0];
+    for (int i = 0; i < count; i++) {
+      // Give each its own ts
+      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+    }
+    LOG.info("memstore1 estimated size=" + size);
+    for (int i = 0; i < count; i++) {
+      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+    }
+    LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
+    // Make a variably sized memstore.
+    DefaultMemStore memstore2 = new DefaultMemStore();
+    for (int i = 0; i < count; i++) {
+      size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
+        new byte[i]));
+    }
+    LOG.info("memstore2 estimated size=" + size);
+    final int seconds = 30;
+    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
+    for (int i = 0; i < seconds; i++) {
+      // Thread.sleep(1000);
+    }
+    LOG.info("Exiting.");
+  }
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -21,16 +21,12 @@ package org.apache.hadoop.hbase.regionse
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -45,21 +41,20 @@ public class DefaultStoreFlusher extends
   }
 
   @Override
-  public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
-      TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
+  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
       MonitoredTask status) throws IOException {
     ArrayList<Path> result = new ArrayList<Path>();
-    if (snapshot.size() == 0) return result; // don't flush if there are no entries
+    int cellsCount = snapshot.getCellsCount();
+    if (cellsCount == 0) return result; // don't flush if there are no entries
 
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
-    InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means skip normal processing
     }
 
     StoreFile.Writer writer;
-    long flushed = 0;
     try {
       // TODO:  We can fail in the below block before we complete adding this flush to
       //        list of store files.  Add cleanup of anything put on filesystem if we fail.
@@ -67,20 +62,19 @@ public class DefaultStoreFlusher extends
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
         writer = store.createWriterInTmp(
-            snapshot.size(), store.getFamily().getCompression(), false, true, true);
-        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+            cellsCount, store.getFamily().getCompression(), false, true, true);
+        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
         try {
-          flushed = performFlush(scanner, writer, smallestReadPoint);
+          performFlush(scanner, writer, smallestReadPoint);
         } finally {
           finalizeWriter(writer, cacheFlushId, status);
         }
       }
     } finally {
-      flushedSize.set(flushed);
       scanner.close();
     }
     LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
-        + StringUtils.humanReadableInt(flushed) +
+        + StringUtils.humanReadableInt(snapshot.getSize()) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
     result.add(writer.getPath());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Mar 14 14:07:10 2014
@@ -30,7 +30,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +38,6 @@ import java.util.concurrent.ExecutorComp
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -83,6 +81,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -116,6 +115,7 @@ import com.google.common.collect.Lists;
  */
 @InterfaceAudience.Private
 public class HStore implements Store {
+  private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
       "hbase.server.compactchecker.interval.multiplier";
   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
@@ -224,7 +224,9 @@ public class HStore implements Store {
     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
     // to clone it?
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
-    this.memstore = new MemStore(conf, this.comparator);
+    String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
+    this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+        Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
@@ -752,7 +754,7 @@ public class HStore implements Store {
 
   /**
    * Snapshot this stores memstore. Call before running
-   * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
+   * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
    *  so it has some work to do.
    */
   void snapshot() {
@@ -769,16 +771,11 @@ public class HStore implements Store {
    * previously.
    * @param logCacheFlushId flush sequence number
    * @param snapshot
-   * @param snapshotTimeRangeTracker
-   * @param flushedSize The number of bytes flushed
    * @param status
    * @return The path name of the tmp file to which the store was flushed
    * @throws IOException
    */
-  protected List<Path> flushCache(final long logCacheFlushId,
-      SortedSet<KeyValue> snapshot,
-      TimeRangeTracker snapshotTimeRangeTracker,
-      AtomicLong flushedSize,
+  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
       MonitoredTask status) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
@@ -789,8 +786,7 @@ public class HStore implements Store {
     IOException lastException = null;
     for (int i = 0; i < flushRetriesNumber; i++) {
       try {
-        List<Path> pathNames = flusher.flushSnapshot(
-            snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
         Path lastPathName = null;
         try {
           for (Path pathName : pathNames) {
@@ -826,14 +822,11 @@ public class HStore implements Store {
   /*
    * @param path The pathname of the tmp file into which the store was flushed
    * @param logCacheFlushId
+   * @param status
    * @return StoreFile created.
    * @throws IOException
    */
-  private StoreFile commitFile(final Path path,
-      final long logCacheFlushId,
-      TimeRangeTracker snapshotTimeRangeTracker,
-      AtomicLong flushedSize,
-      MonitoredTask status)
+  private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
       throws IOException {
     // Write-out finished successfully, move into the right spot
     Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
@@ -916,16 +909,16 @@ public class HStore implements Store {
   /*
    * Change storeFiles adding into place the Reader produced by this new flush.
    * @param sfs Store files
-   * @param set That was used to make the passed file.
+   * @param snapshotId
    * @throws IOException
    * @return Whether compaction is required.
    */
-  private boolean updateStorefiles(
-      final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
+  private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
+      throws IOException {
     this.lock.writeLock().lock();
     try {
       this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
-      this.memstore.clearSnapshot(set);
+      this.memstore.clearSnapshot(snapshotId);
     } finally {
       // We need the lock, as long as we are updating the storeFiles
       // or changing the memstore. Let us release it before calling
@@ -1827,7 +1820,7 @@ public class HStore implements Store {
 
   @Override
   public long getMemStoreSize() {
-    return this.memstore.heapSize();
+    return this.memstore.size();
   }
 
   @Override
@@ -1918,10 +1911,8 @@ public class HStore implements Store {
   private class StoreFlusherImpl implements StoreFlushContext {
 
     private long cacheFlushSeqNum;
-    private SortedSet<KeyValue> snapshot;
+    private MemStoreSnapshot snapshot;
     private List<Path> tempFiles;
-    private TimeRangeTracker snapshotTimeRangeTracker;
-    private final AtomicLong flushedSize = new AtomicLong();
 
     private StoreFlusherImpl(long cacheFlushSeqNum) {
       this.cacheFlushSeqNum = cacheFlushSeqNum;
@@ -1933,15 +1924,12 @@ public class HStore implements Store {
      */
     @Override
     public void prepare() {
-      memstore.snapshot();
-      this.snapshot = memstore.getSnapshot();
-      this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
+      this.snapshot = memstore.snapshot();
     }
 
     @Override
     public void flushCache(MonitoredTask status) throws IOException {
-      tempFiles = HStore.this.flushCache(
-        cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
+      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
     }
 
     @Override
@@ -1952,8 +1940,7 @@ public class HStore implements Store {
       List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
       for (Path storeFilePath : tempFiles) {
         try {
-          storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
-              snapshotTimeRangeTracker, flushedSize, status));
+          storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
         } catch (IOException ex) {
           LOG.error("Failed to commit store file " + storeFilePath, ex);
           // Try to delete the files we have committed before.
@@ -1976,7 +1963,7 @@ public class HStore implements Store {
         }
       }
       // Add new file to store files.  Clear snapshot too while we have the Store write lock.
-      return HStore.this.updateStorefiles(storeFiles, snapshot);
+      return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
     }
   }