You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/14 15:07:10 UTC
svn commit: r1577541 [2/3] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/io/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Mar 14 14:07:10 2014
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,168 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * The MemStore holds in-memory modifications to the Store. Modifications
- * are {@link KeyValue}s. When asked to flush, current memstore is moved
- * to snapshot and is cleared. We continue to serve edits out of new memstore
- * and backing snapshot until flusher reports in that the flush succeeded. At
- * this point we let the snapshot go.
- * <p>
- * The MemStore functions should not be called in parallel. Callers should hold
- * write and read locks. This is done in {@link HStore}.
- * </p>
- *
- * TODO: Adjust size of the memstore when we remove items because they have
- * been deleted.
- * TODO: With new KVSLS, need to make sure we update HeapSize with difference
- * in KV size.
+ * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold write and read
+ * locks. This is done in {@link HStore}.
+ * </p>
*/
@InterfaceAudience.Private
-public class MemStore implements HeapSize {
- private static final Log LOG = LogFactory.getLog(MemStore.class);
-
- static final String USEMSLAB_KEY =
- "hbase.hregion.memstore.mslab.enabled";
- private static final boolean USEMSLAB_DEFAULT = true;
-
- private Configuration conf;
-
- // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
- // better semantics. The Map will overwrite if passed a key it already had
- // whereas the Set will not add new KV if key is same though value might be
- // different. Value is not important -- just make sure always same
- // reference passed.
- volatile KeyValueSkipListSet kvset;
-
- // Snapshot of memstore. Made for flusher.
- volatile KeyValueSkipListSet snapshot;
-
- final KeyValue.KVComparator comparator;
-
- // Used to track own heapSize
- final AtomicLong size;
- private volatile long snapshotSize;
-
- // Used to track when to flush
- volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
- TimeRangeTracker timeRangeTracker;
- TimeRangeTracker snapshotTimeRangeTracker;
-
- MemStoreChunkPool chunkPool;
- volatile MemStoreLAB allocator;
- volatile MemStoreLAB snapshotAllocator;
-
- /**
- * Default constructor. Used for tests.
- */
- public MemStore() {
- this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
- }
-
- /**
- * Constructor.
- * @param c Comparator
- */
- public MemStore(final Configuration conf,
- final KeyValue.KVComparator c) {
- this.conf = conf;
- this.comparator = c;
- this.kvset = new KeyValueSkipListSet(c);
- this.snapshot = new KeyValueSkipListSet(c);
- timeRangeTracker = new TimeRangeTracker();
- snapshotTimeRangeTracker = new TimeRangeTracker();
- this.size = new AtomicLong(DEEP_OVERHEAD);
- this.snapshotSize = 0;
- if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
- this.chunkPool = MemStoreChunkPool.getPool(conf);
- this.allocator = new MemStoreLAB(conf, chunkPool);
- } else {
- this.allocator = null;
- this.chunkPool = null;
- }
- }
-
- void dump() {
- for (KeyValue kv: this.kvset) {
- LOG.info(kv);
- }
- for (KeyValue kv: this.snapshot) {
- LOG.info(kv);
- }
- }
+public interface MemStore extends HeapSize {
/**
- * Creates a snapshot of the current memstore.
- * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)}
- * To get the snapshot made by this method, use {@link #getSnapshot()}
+ * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+ * {@link #clearSnapshot(long)}.
+ * @return {@link MemStoreSnapshot}
*/
- void snapshot() {
- // If snapshot currently has entries, then flusher failed or didn't call
- // cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
- LOG.warn("Snapshot called again without clearing previous. " +
- "Doing nothing. Another ongoing flush or did we fail last attempt?");
- } else {
- if (!this.kvset.isEmpty()) {
- this.snapshotSize = keySize();
- this.snapshot = this.kvset;
- this.kvset = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- this.snapshotAllocator = this.allocator;
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- this.allocator = new MemStoreLAB(conf, chunkPool);
- }
- timeOfOldestEdit = Long.MAX_VALUE;
- }
- }
- }
+ MemStoreSnapshot snapshot();
/**
- * Return the current snapshot.
- * Called by flusher to get current snapshot made by a previous
- * call to {@link #snapshot()}
- * @return Return snapshot.
+ * Clears the current snapshot of the Memstore.
+ * @param id
* @see #snapshot()
- * @see #clearSnapshot(SortedSet)
*/
- KeyValueSkipListSet getSnapshot() {
- return this.snapshot;
- }
+ void clearSnapshot(long id) throws UnexpectedException;
/**
* On flush, how much memory we will clear.
@@ -187,271 +56,42 @@ public class MemStore implements HeapSiz
*
* @return size of data that is going to be flushed
*/
- long getFlushableSize() {
- return this.snapshotSize > 0 ? this.snapshotSize : keySize();
- }
-
- /**
- * The passed snapshot was successfully persisted; it can be let go.
- * @param ss The snapshot to clean out.
- * @throws UnexpectedException
- * @see #snapshot()
- */
- void clearSnapshot(final SortedSet<KeyValue> ss)
- throws UnexpectedException {
- MemStoreLAB tmpAllocator = null;
- if (this.snapshot != ss) {
- throw new UnexpectedException("Current snapshot is " +
- this.snapshot + ", was passed " + ss);
- }
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!ss.isEmpty()) {
- this.snapshot = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- this.snapshotSize = 0;
- if (this.snapshotAllocator != null) {
- tmpAllocator = this.snapshotAllocator;
- this.snapshotAllocator = null;
- }
- if (tmpAllocator != null) {
- tmpAllocator.close();
- }
- }
+ long getFlushableSize();
/**
* Write an update
- * @param kv
+ * @param cell
* @return approximate size of the passed key and value.
*/
- long add(final KeyValue kv) {
- KeyValue toAdd = maybeCloneWithAllocator(kv);
- return internalAdd(toAdd);
- }
-
- long timeOfOldestEdit() {
- return timeOfOldestEdit;
- }
-
- private boolean addToKVSet(KeyValue e) {
- boolean b = this.kvset.add(e);
- setOldestEditTimeToNow();
- return b;
- }
-
- private boolean removeFromKVSet(KeyValue e) {
- boolean b = this.kvset.remove(e);
- setOldestEditTimeToNow();
- return b;
- }
-
- void setOldestEditTimeToNow() {
- if (timeOfOldestEdit == Long.MAX_VALUE) {
- timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
- }
- }
+ long add(final Cell cell);
/**
- * Internal version of add() that doesn't clone KVs with the
- * allocator, and doesn't take the lock.
- *
- * Callers should ensure they already have the read lock taken
+ * @return Oldest timestamp of all the Cells in the MemStore
*/
- private long internalAdd(final KeyValue toAdd) {
- long s = heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
- }
-
- private KeyValue maybeCloneWithAllocator(KeyValue kv) {
- if (allocator == null) {
- return kv;
- }
-
- int len = kv.getLength();
- Allocation alloc = allocator.allocateBytes(len);
- if (alloc == null) {
- // The allocation was too large, allocator decided
- // not to do anything with it.
- return kv;
- }
- assert alloc.getData() != null;
- System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
- KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
- newKv.setMvccVersion(kv.getMvccVersion());
- return newKv;
- }
+ long timeOfOldestEdit();
/**
- * Remove n key from the memstore. Only kvs that have the same key and the
- * same memstoreTS are removed. It is ok to not update timeRangeTracker
- * in this call. It is possible that we can optimize this method by using
- * tailMap/iterator, but since this method is called rarely (only for
- * error recovery), we can leave those optimization for the future.
- * @param kv
+ * Remove n key from the memstore. Only kvs that have the same key and the same memstoreTS are
+ * removed. It is ok to not update timeRangeTracker in this call.
+ * @param cell
*/
- void rollback(final KeyValue kv) {
- // If the key is in the snapshot, delete it. We should not update
- // this.size, because that tracks the size of only the memstore and
- // not the snapshot. The flush of this snapshot to disk has not
- // yet started because Store.flush() waits for all rwcc transactions to
- // commit before starting the flush to disk.
- KeyValue found = this.snapshot.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- this.snapshot.remove(kv);
- }
- // If the key is in the memstore, delete it. Update this.size.
- found = this.kvset.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- removeFromKVSet(kv);
- long s = heapSizeChange(kv, true);
- this.size.addAndGet(-s);
- }
- }
+ void rollback(final Cell cell);
/**
* Write a delete
- * @param delete
+ * @param deleteCell
* @return approximate size of the passed key and value.
*/
- long delete(final KeyValue delete) {
- long s = 0;
- KeyValue toAdd = maybeCloneWithAllocator(delete);
- s += heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
- }
-
- /**
- * @param kv Find the row that comes after this one. If null, we return the
- * first.
- * @return Next row or null if none found.
- */
- KeyValue getNextRow(final KeyValue kv) {
- return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
- }
-
- /*
- * @param a
- * @param b
- * @return Return lowest of a or b or null if both a and b are null
- */
- private KeyValue getLowest(final KeyValue a, final KeyValue b) {
- if (a == null) {
- return b;
- }
- if (b == null) {
- return a;
- }
- return comparator.compareRows(a, b) <= 0? a: b;
- }
-
- /*
- * @param key Find row that follows this one. If null, return first.
- * @param map Set to look in for a row beyond <code>row</code>.
- * @return Next row or null if none found. If one found, will be a new
- * KeyValue -- can be destroyed by subsequent calls to this method.
- */
- private KeyValue getNextRow(final KeyValue key,
- final NavigableSet<KeyValue> set) {
- KeyValue result = null;
- SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
- // Iterate until we fall into the next row; i.e. move off current row
- for (KeyValue kv: tail) {
- if (comparator.compareRows(kv, key) <= 0)
- continue;
- // Note: Not suppressing deletes or expired cells. Needs to be handled
- // by higher up functions.
- result = kv;
- break;
- }
- return result;
- }
+ long delete(final Cell deleteCell);
/**
+ * Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. The
+ * target row key is set in state.
* @param state column/delete tracking state
*/
- void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
- getRowKeyAtOrBefore(kvset, state);
- getRowKeyAtOrBefore(snapshot, state);
- }
-
- /*
- * @param set
- * @param state Accumulates deletes and candidates.
- */
- private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
- final GetClosestRowBeforeTracker state) {
- if (set.isEmpty()) {
- return;
- }
- if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
- // Found nothing in row. Try backing up.
- getRowKeyBefore(set, state);
- }
- }
-
- /*
- * Walk forward in a row from <code>firstOnRow</code>. Presumption is that
- * we have been passed the first possible key on a row. As we walk forward
- * we accumulate deletes until we hit a candidate on the row at which point
- * we return.
- * @param set
- * @param firstOnRow First possible key on this row.
- * @param state
- * @return True if we found a candidate walking this row.
- */
- private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
- final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
- boolean foundCandidate = false;
- SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
- if (tail.isEmpty()) return foundCandidate;
- for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
- KeyValue kv = i.next();
- // Did we go beyond the target row? If so break.
- if (state.isTooFar(kv, firstOnRow)) break;
- if (state.isExpired(kv)) {
- i.remove();
- continue;
- }
- // If we added something, this row is a contender. break.
- if (state.handle(kv)) {
- foundCandidate = true;
- break;
- }
- }
- return foundCandidate;
- }
-
- /*
- * Walk backwards through the passed set a row at a time until we run out of
- * set or until we get a candidate.
- * @param set
- * @param state
- */
- private void getRowKeyBefore(NavigableSet<KeyValue> set,
- final GetClosestRowBeforeTracker state) {
- KeyValue firstOnRow = state.getTargetKey();
- for (Member p = memberOfPreviousRow(set, state, firstOnRow);
- p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
- // Make sure we don't fall out of our table.
- if (!state.isTargetTable(p.kv)) break;
- // Stop looking if we've exited the better candidate range.
- if (!state.isBetterCandidate(p.kv)) break;
- // Make into firstOnRow
- firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
- HConstants.LATEST_TIMESTAMP);
- // If we find something, break;
- if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
- }
- }
+ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state);
/**
- * Only used by tests. TODO: Remove
- *
* Given the specs of a column, update it, first by inserting a new record,
* then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
* will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
@@ -464,616 +104,35 @@ public class MemStore implements HeapSiz
* @param qualifier
* @param newValue
* @param now
- * @return Timestamp
+ * @return Timestamp
*/
- long updateColumnValue(byte[] row,
- byte[] family,
- byte[] qualifier,
- long newValue,
- long now) {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
- }
- }
-
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
-
- SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
- for (KeyValue kv : ss) {
- // if this isnt the row we are interested in, then bail:
- if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
- break; // rows dont match, bail.
- }
-
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
- kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
- now = kv.getTimestamp();
- }
- }
-
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- List<Cell> cells = new ArrayList<Cell>(1);
- cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
- return upsert(cells, 1L);
- }
+ long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now);
/**
- * Update or insert the specified KeyValues.
+ * Update or insert the specified cells.
* <p>
- * For each KeyValue, insert into MemStore. This will atomically upsert the
- * value for that row/family/qualifier. If a KeyValue did already exist,
- * it will then be removed.
+ * For each Cell, insert into MemStore. This will atomically upsert the value for that
+ * row/family/qualifier. If a Cell did already exist, it will then be removed.
* <p>
- * Currently the memstoreTS is kept at 0 so as each insert happens, it will
- * be immediately visible. May want to change this so it is atomic across
- * all KeyValues.
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will be immediately
+ * visible. May want to change this so it is atomic across all KeyValues.
* <p>
- * This is called under row lock, so Get operations will still see updates
- * atomically. Scans will only see each KeyValue update as atomic.
- *
+ * This is called under row lock, so Get operations will still see updates atomically. Scans will
+ * only see each KeyValue update as atomic.
* @param cells
- * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param readpoint readpoint below which we can safely remove duplicate Cells.
* @return change in memstore size
*/
- public long upsert(Iterable<Cell> cells, long readpoint) {
- long size = 0;
- for (Cell cell : cells) {
- size += upsert(cell, readpoint);
- }
- return size;
- }
-
- /**
- * Inserts the specified KeyValue into MemStore and deletes any existing
- * versions of the same row/family/qualifier as the specified KeyValue.
- * <p>
- * First, the specified KeyValue is inserted into the Memstore.
- * <p>
- * If there are any existing KeyValues in this MemStore with the same row,
- * family, and qualifier, they are removed.
- * <p>
- * Callers must hold the read lock.
- *
- * @param cell
- * @return change in size of MemStore
- */
- private long upsert(Cell cell, long readpoint) {
- // Add the KeyValue to the MemStore
- // Use the internalAdd method here since we (a) already have a lock
- // and (b) cannot safely use the MSLAB here without potentially
- // hitting OOME - see TestMemStore.testUpsertMSLAB for a
- // test that triggers the pathological case if we don't avoid MSLAB
- // here.
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- long addedSize = internalAdd(kv);
-
- // Get the KeyValues for the row/family/qualifier regardless of timestamp.
- // For this case we want to clean up any other puts
- KeyValue firstKv = KeyValue.createFirstOnRow(
- kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
- kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
- SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
- Iterator<KeyValue> it = ss.iterator();
- // versions visible to oldest scanner
- int versionsVisible = 0;
- while ( it.hasNext() ) {
- KeyValue cur = it.next();
-
- if (kv == cur) {
- // ignore the one just put in
- continue;
- }
- // check that this is the row and column we are interested in, otherwise bail
- if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
- // only remove Puts that concurrent scanners cannot possibly see
- if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
- cur.getMvccVersion() <= readpoint) {
- if (versionsVisible > 1) {
- // if we get here we have seen at least one version visible to the oldest scanner,
- // which means we can prove that no scanner will see this version
-
- // false means there was a change, so give us the size.
- long delta = heapSizeChange(cur, true);
- addedSize -= delta;
- this.size.addAndGet(-delta);
- it.remove();
- setOldestEditTimeToNow();
- } else {
- versionsVisible++;
- }
- }
- } else {
- // past the row or column, done
- break;
- }
- }
- return addedSize;
- }
-
- /*
- * Immutable data structure to hold member found in set and the set it was
- * found in. Include set because it is carrying context.
- */
- private static class Member {
- final KeyValue kv;
- final NavigableSet<KeyValue> set;
- Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
- this.kv = kv;
- this.set = s;
- }
- }
-
- /*
- * @param set Set to walk back in. Pass a first in row or we'll return
- * same row (loop).
- * @param state Utility and context.
- * @param firstOnRow First item on the row after the one we want to find a
- * member in.
- * @return Null or member of row previous to <code>firstOnRow</code>
- */
- private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
- final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
- NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
- if (head.isEmpty()) return null;
- for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
- KeyValue found = i.next();
- if (state.isExpired(found)) {
- i.remove();
- continue;
- }
- return new Member(head, found);
- }
- return null;
- }
-
- /**
- * @return scanner on memstore and snapshot in this order.
- */
- List<KeyValueScanner> getScanners(long readPt) {
- return Collections.<KeyValueScanner>singletonList(
- new MemStoreScanner(readPt));
- }
-
- /**
- * Check if this memstore may contain the required keys
- * @param scan
- * @return False if the key definitely does not exist in this Memstore
- */
- public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
- return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
- snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
- && (Math.max(timeRangeTracker.getMaximumTimestamp(),
- snapshotTimeRangeTracker.getMaximumTimestamp()) >=
- oldestUnexpiredTS);
- }
-
- public TimeRangeTracker getSnapshotTimeRangeTracker() {
- return this.snapshotTimeRangeTracker;
- }
-
- /*
- * MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore -- both current
- * map and snapshot.
- * This behaves as if it were a real scanner but does not maintain position.
- */
- protected class MemStoreScanner extends NonLazyKeyValueScanner {
- // Next row information for either kvset or snapshot
- private KeyValue kvsetNextRow = null;
- private KeyValue snapshotNextRow = null;
-
- // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
- private KeyValue kvsetItRow = null;
- private KeyValue snapshotItRow = null;
-
- // iterator based scanning.
- private Iterator<KeyValue> kvsetIt;
- private Iterator<KeyValue> snapshotIt;
-
- // The kvset and snapshot at the time of creating this scanner
- private KeyValueSkipListSet kvsetAtCreation;
- private KeyValueSkipListSet snapshotAtCreation;
-
- // the pre-calculated KeyValue to be returned by peek() or next()
- private KeyValue theNext;
-
- // The allocator and snapshot allocator at the time of creating this scanner
- volatile MemStoreLAB allocatorAtCreation;
- volatile MemStoreLAB snapshotAllocatorAtCreation;
-
- // A flag represents whether could stop skipping KeyValues for MVCC
- // if have encountered the next row. Only used for reversed scan
- private boolean stopSkippingKVsIfNextRow = false;
-
- private long readPoint;
-
- /*
- Some notes...
-
- So memstorescanner is fixed at creation time. this includes pointers/iterators into
- existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
- snapshot is moved. since kvset is null there is no point on reseeking on both,
- we can save us the trouble. During the snapshot->hfile transition, the memstore
- scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
- potentially do something smarter by adjusting the existing memstore scanner.
-
- But there is a greater problem here, that being once a scanner has progressed
- during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
- if a scan lasts a little while, there is a chance for new entries in kvset to
- become available but we will never see them. This needs to be handled at the
- StoreScanner level with coordination with MemStoreScanner.
-
- Currently, this problem is only partly managed: during the small amount of time
- when the StoreScanner has not yet created a new MemStoreScanner, we will miss
- the adds to kvset in the MemStoreScanner.
- */
-
- MemStoreScanner(long readPoint) {
- super();
-
- this.readPoint = readPoint;
- kvsetAtCreation = kvset;
- snapshotAtCreation = snapshot;
- if (allocator != null) {
- this.allocatorAtCreation = allocator;
- this.allocatorAtCreation.incScannerCount();
- }
- if (snapshotAllocator != null) {
- this.snapshotAllocatorAtCreation = snapshotAllocator;
- this.snapshotAllocatorAtCreation.incScannerCount();
- }
- }
-
- private KeyValue getNext(Iterator<KeyValue> it) {
- KeyValue startKV = theNext;
- KeyValue v = null;
- try {
- while (it.hasNext()) {
- v = it.next();
- if (v.getMvccVersion() <= this.readPoint) {
- return v;
- }
- if (stopSkippingKVsIfNextRow && startKV != null
- && comparator.compareRows(v, startKV) > 0) {
- return null;
- }
- }
-
- return null;
- } finally {
- if (v != null) {
- // in all cases, remember the last KV iterated to
- if (it == snapshotIt) {
- snapshotItRow = v;
- } else {
- kvsetItRow = v;
- }
- }
- }
- }
-
- /**
- * Set the scanner at the seek key.
- * Must be called only once: there is no thread safety between the scanner
- * and the memStore.
- * @param key seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public synchronized boolean seek(KeyValue key) {
- if (key == null) {
- close();
- return false;
- }
-
- // kvset and snapshot will never be null.
- // if tailSet can't find anything, SortedSet is empty (not null).
- kvsetIt = kvsetAtCreation.tailSet(key).iterator();
- snapshotIt = snapshotAtCreation.tailSet(key).iterator();
- kvsetItRow = null;
- snapshotItRow = null;
-
- return seekInSubLists(key);
- }
-
-
- /**
- * (Re)initialize the iterators after a seek or a reseek.
- */
- private synchronized boolean seekInSubLists(KeyValue key){
- kvsetNextRow = getNext(kvsetIt);
- snapshotNextRow = getNext(snapshotIt);
-
- // Calculate the next value
- theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
- // has data
- return (theNext != null);
- }
-
-
- /**
- * Move forward on the sub-lists set previously by seek.
- * @param key seek value (should be non-null)
- * @return true if there is at least one KV to read, false otherwise
- */
- @Override
- public synchronized boolean reseek(KeyValue key) {
- /*
- See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
- This code is executed concurrently with flush and puts, without locks.
- Two points must be known when working on this code:
- 1) It's not possible to use the 'kvTail' and 'snapshot'
- variables, as they are modified during a flush.
- 2) The ideal implementation for performance would use the sub skip list
- implicitly pointed by the iterators 'kvsetIt' and
- 'snapshotIt'. Unfortunately the Java API does not offer a method to
- get it. So we remember the last keys we iterated to and restore
- the reseeked set to at least that point.
- */
-
- kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
- snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
- return seekInSubLists(key);
- }
-
-
- @Override
- public synchronized KeyValue peek() {
- //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
- return theNext;
- }
-
- @Override
- public synchronized KeyValue next() {
- if (theNext == null) {
- return null;
- }
-
- final KeyValue ret = theNext;
-
- // Advance one of the iterators
- if (theNext == kvsetNextRow) {
- kvsetNextRow = getNext(kvsetIt);
- } else {
- snapshotNextRow = getNext(snapshotIt);
- }
-
- // Calculate the next value
- theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
- //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
- // getLowest() + " threadpoint=" + readpoint);
- return ret;
- }
-
- /*
- * Returns the lower of the two key values, or null if they are both null.
- * This uses comparator.compare() to compare the KeyValue using the memstore
- * comparator.
- */
- private KeyValue getLowest(KeyValue first, KeyValue second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare <= 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- /*
- * Returns the higher of the two key values, or null if they are both null.
- * This uses comparator.compare() to compare the KeyValue using the memstore
- * comparator.
- */
- private KeyValue getHighest(KeyValue first, KeyValue second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare > 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- public synchronized void close() {
- this.kvsetNextRow = null;
- this.snapshotNextRow = null;
-
- this.kvsetIt = null;
- this.snapshotIt = null;
-
- if (allocatorAtCreation != null) {
- this.allocatorAtCreation.decScannerCount();
- this.allocatorAtCreation = null;
- }
- if (snapshotAllocatorAtCreation != null) {
- this.snapshotAllocatorAtCreation.decScannerCount();
- this.snapshotAllocatorAtCreation = null;
- }
-
- this.kvsetItRow = null;
- this.snapshotItRow = null;
- }
-
- /**
- * MemStoreScanner returns max value as sequence id because it will
- * always have the latest data among all files.
- */
- @Override
- public long getSequenceID() {
- return Long.MAX_VALUE;
- }
-
- @Override
- public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
- long oldestUnexpiredTS) {
- return shouldSeek(scan, oldestUnexpiredTS);
- }
-
- /**
- * Seek scanner to the given key first. If it returns false(means
- * peek()==null) or scanner's peek row is bigger than row of given key, seek
- * the scanner to the previous row of given key
- */
- @Override
- public synchronized boolean backwardSeek(KeyValue key) {
- seek(key);
- if (peek() == null || comparator.compareRows(peek(), key) > 0) {
- return seekToPreviousRow(key);
- }
- return true;
- }
-
- /**
- * Separately get the KeyValue before the specified key from kvset and
- * snapshotset, and use the row of higher one as the previous row of
- * specified key, then seek to the first KeyValue of previous row
- */
- @Override
- public synchronized boolean seekToPreviousRow(KeyValue key) {
- KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
- SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
- KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
- SortedSet<KeyValue> snapshotHead = snapshotAtCreation
- .headSet(firstKeyOnRow);
- KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
- .last();
- KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
- if (lastKVBeforeRow == null) {
- theNext = null;
- return false;
- }
- KeyValue firstKeyOnPreviousRow = KeyValue
- .createFirstOnRow(lastKVBeforeRow.getRow());
- this.stopSkippingKVsIfNextRow = true;
- seek(firstKeyOnPreviousRow);
- this.stopSkippingKVsIfNextRow = false;
- if (peek() == null
- || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
- return seekToPreviousRow(lastKVBeforeRow);
- }
- return true;
- }
-
- @Override
- public synchronized boolean seekToLastRow() {
- KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
- .last();
- KeyValue second = snapshotAtCreation.isEmpty() ? null
- : snapshotAtCreation.last();
- KeyValue higherKv = getHighest(first, second);
- if (higherKv == null) {
- return false;
- }
- KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
- if (seek(firstKvOnLastRow)) {
- return true;
- } else {
- return seekToPreviousRow(higherKv);
- }
-
- }
- }
-
- public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
-
- public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
- (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
- /*
- * Calculate how the MemStore size has changed. Includes overhead of the
- * backing Map.
- * @param kv
- * @param notpresent True if the kv was NOT present in the set.
- * @return Size
- */
- static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
- return notpresent ?
- ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
- 0;
- }
-
- /**
- * Get the entire heap usage for this MemStore not including keys in the
- * snapshot.
- */
- @Override
- public long heapSize() {
- return size.get();
- }
+ long upsert(Iterable<Cell> cells, long readpoint);
/**
- * Get the heap usage of KVs in this MemStore.
+ * @return scanner over the memstore. This might include scanner over the snapshot when one is
+ * present.
*/
- public long keySize() {
- return heapSize() - DEEP_OVERHEAD;
- }
+ List<KeyValueScanner> getScanners(long readPt);
/**
- * Code to help figure if our approximation of object heap sizes is close
- * enough. See hbase-900. Fills memstores then waits so user can heap
- * dump and bring up resultant hprof in something like jprofiler which
- * allows you get 'deep size' on objects.
- * @param args main args
+ * @return Total memory occupied by this MemStore.
*/
- public static void main(String [] args) {
- RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
- LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
- runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
- LOG.info("vmInputArguments=" + runtime.getInputArguments());
- MemStore memstore1 = new MemStore();
- // TODO: x32 vs x64
- long size = 0;
- final int count = 10000;
- byte [] fam = Bytes.toBytes("col");
- byte [] qf = Bytes.toBytes("umn");
- byte [] empty = new byte[0];
- for (int i = 0; i < count; i++) {
- // Give each its own ts
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
- }
- LOG.info("memstore1 estimated size=" + size);
- for (int i = 0; i < count; i++) {
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
- }
- LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
- // Make a variably sized memstore.
- MemStore memstore2 = new MemStore();
- for (int i = 0; i < count; i++) {
- size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
- new byte[i]));
- }
- LOG.info("memstore2 estimated size=" + size);
- final int seconds = 30;
- LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
- for (int i = 0; i < seconds; i++) {
- // Thread.sleep(1000);
- }
- LOG.info("Exiting.");
- }
+ long size();
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java?rev=1577541&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java Fri Mar 14 14:07:10 2014
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
+ * count of cells in it and total memory size occupied by all the cells, timestamp information of
+ * all the cells and a scanner to read all cells in it.
+ */
+@InterfaceAudience.Private
+public class MemStoreSnapshot {
+
+ private final long id;
+ private final int cellsCount;
+ private final long size;
+ private final TimeRangeTracker timeRangeTracker;
+ private final KeyValueScanner scanner;
+
+ public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker,
+ KeyValueScanner scanner) {
+ this.id = id;
+ this.cellsCount = cellsCount;
+ this.size = size;
+ this.timeRangeTracker = timeRangeTracker;
+ this.scanner = scanner;
+ }
+
+ /**
+ * @return snapshot's identifier.
+ */
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * @return Number of Cells in this snapshot.
+ */
+ public int getCellsCount() {
+ return cellsCount;
+ }
+
+ /**
+ * @return Total memory size occupied by this snapshot.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * @return {@link TimeRangeTracker} for all the Cells in the snapshot.
+ */
+ public TimeRangeTracker getTimeRangeTracker() {
+ return this.timeRangeTracker;
+ }
+
+ /**
+ * @return {@link KeyValueScanner} for iterating over the snapshot
+ */
+ public KeyValueScanner getScanner() {
+ return this.scanner;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -22,11 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -37,7 +33,6 @@ import org.apache.hadoop.hbase.KeyValueU
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -57,15 +52,11 @@ abstract class StoreFlusher {
* Turns a snapshot of memstore into a set of store files.
* @param snapshot Memstore snapshot.
* @param cacheFlushSeqNum Log cache flush sequence number.
- * @param snapshotTimeRangeTracker Time range tracker from the memstore
- * pertaining to the snapshot.
- * @param flushedSize Out parameter for the size of the KVs flushed.
* @param status Task that represents the flush operation and may be updated with status.
* @return List of files written. Can be empty; must not be null.
*/
- public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
- TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
- throws IOException;
+ public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
+ MonitoredTask status) throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
@@ -81,21 +72,21 @@ abstract class StoreFlusher {
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
+ * @param snapshotScanner
+ * @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
- protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
+ protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
long smallestReadPoint) throws IOException {
- KeyValueScanner memstoreScanner =
- new CollectionBackedScanner(snapshot, store.getComparator());
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
- scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+ scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
- Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+ Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
@@ -115,15 +106,13 @@ abstract class StoreFlusher {
* @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
- * @return Bytes flushed.
*/
- protected long performFlush(InternalScanner scanner,
+ protected void performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore;
- long flushed = 0;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
@@ -139,11 +128,9 @@ abstract class StoreFlusher {
kv.setMvccVersion(0);
}
sink.append(kv);
- flushed += MemStore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
- return flushed;
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -21,21 +21,16 @@ package org.apache.hadoop.hbase.regionse
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import com.google.common.annotations.VisibleForTesting;
@@ -57,33 +52,32 @@ public class StripeStoreFlusher extends
}
@Override
- public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
- final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
- throws IOException {
+ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
+ MonitoredTask status) throws IOException {
List<Path> result = null;
- int kvCount = snapshot.size();
- if (kvCount == 0) return result; // don't flush if there are no entries
+ int cellsCount = snapshot.getCellsCount();
+ if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
// Let policy select flush method.
- StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+ StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
- long flushedBytes = 0;
boolean success = false;
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
- StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
+ StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
+ snapshot.getTimeRangeTracker(), cellsCount);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
synchronized (flushLock) {
- flushedBytes = performFlush(scanner, mw, smallestReadPoint);
+ performFlush(scanner, mw, smallestReadPoint);
result = mw.commitWriters(cacheFlushSeqNum, false);
success = true;
}
@@ -100,7 +94,6 @@ public class StripeStoreFlusher extends
}
}
}
- flushedSize.set(flushedBytes);
try {
scanner.close();
} catch (IOException ex) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Fri Mar 14 14:07:10 2014
@@ -44,10 +44,10 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet;
-import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
@@ -291,17 +291,17 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
- // MemStore Overhead
- cl = MemStore.class;
- actual = MemStore.FIXED_OVERHEAD;
+ // DefaultMemStore Overhead
+ cl = DefaultMemStore.class;
+ actual = DefaultMemStore.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
if(expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
- // MemStore Deep Overhead
- actual = MemStore.DEEP_OVERHEAD;
+ // DefaultMemStore Deep Overhead
+ actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));