You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/14 15:07:10 UTC
svn commit: r1577541 [1/3] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/io/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: anoopsamjohn
Date: Fri Mar 14 14:07:10 2014
New Revision: 1577541
URL: http://svn.apache.org/r1577541
Log:
HBASE-10648 Pluggable Memstore.(Anoop)
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
Removed:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java?rev=1577541&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java Fri Mar 14 14:07:10 2014
@@ -0,0 +1,1070 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.rmi.UnexpectedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The MemStore holds in-memory modifications to the Store. Modifications
+ * are {@link Cell}s. When asked to flush, current memstore is moved
+ * to snapshot and is cleared. We continue to serve edits out of new memstore
+ * and backing snapshot until flusher reports in that the flush succeeded. At
+ * this point we let the snapshot go.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold
+ * write and read locks. This is done in {@link HStore}.
+ * </p>
+ *
+ * TODO: Adjust size of the memstore when we remove items because they have
+ * been deleted.
+ * TODO: With new KVSLS, need to make sure we update HeapSize with difference
+ * in KV size.
+ */
+@InterfaceAudience.Private
+public class DefaultMemStore implements MemStore {
+ private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
+
+ static final String USEMSLAB_KEY =
+ "hbase.hregion.memstore.mslab.enabled";
+ private static final boolean USEMSLAB_DEFAULT = true;
+
+ private Configuration conf;
+
+ // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
+ // better semantics. The Map will overwrite if passed a key it already had
+ // whereas the Set will not add new KV if key is same though value might be
+ // different. Value is not important -- just make sure always same
+ // reference passed.
+ volatile KeyValueSkipListSet kvset;
+
+ // Snapshot of memstore. Made for flusher.
+ volatile KeyValueSkipListSet snapshot;
+
+ final KeyValue.KVComparator comparator;
+
+ // Used to track own heapSize
+ final AtomicLong size;
+ private volatile long snapshotSize;
+
+ // Used to track when to flush
+ volatile long timeOfOldestEdit = Long.MAX_VALUE;
+
+ TimeRangeTracker timeRangeTracker;
+ TimeRangeTracker snapshotTimeRangeTracker;
+
+ MemStoreChunkPool chunkPool;
+ volatile MemStoreLAB allocator;
+ volatile MemStoreLAB snapshotAllocator;
+ volatile long snapshotId;
+
+ /**
+ * Default constructor. Used for tests.
+ */
+ public DefaultMemStore() {
+ this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
+ }
+
+ /**
+ * Constructor.
+ * @param c Comparator
+ */
+ public DefaultMemStore(final Configuration conf,
+ final KeyValue.KVComparator c) {
+ this.conf = conf;
+ this.comparator = c;
+ this.kvset = new KeyValueSkipListSet(c);
+ this.snapshot = new KeyValueSkipListSet(c);
+ timeRangeTracker = new TimeRangeTracker();
+ snapshotTimeRangeTracker = new TimeRangeTracker();
+ this.size = new AtomicLong(DEEP_OVERHEAD);
+ this.snapshotSize = 0;
+ if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+ this.chunkPool = MemStoreChunkPool.getPool(conf);
+ this.allocator = new MemStoreLAB(conf, chunkPool);
+ } else {
+ this.allocator = null;
+ this.chunkPool = null;
+ }
+ }
+
+ void dump() {
+ for (KeyValue kv: this.kvset) {
+ LOG.info(kv);
+ }
+ for (KeyValue kv: this.snapshot) {
+ LOG.info(kv);
+ }
+ }
+
+ /**
+ * Creates a snapshot of the current memstore.
+ * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
+ */
+ @Override
+ public MemStoreSnapshot snapshot() {
+ // If snapshot currently has entries, then flusher failed or didn't call
+ // cleanup. Log a warning.
+ if (!this.snapshot.isEmpty()) {
+ LOG.warn("Snapshot called again without clearing previous. " +
+ "Doing nothing. Another ongoing flush or did we fail last attempt?");
+ } else {
+ this.snapshotId = EnvironmentEdgeManager.currentTimeMillis();
+ this.snapshotSize = keySize();
+ if (!this.kvset.isEmpty()) {
+ this.snapshot = this.kvset;
+ this.kvset = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = this.timeRangeTracker;
+ this.timeRangeTracker = new TimeRangeTracker();
+ // Reset heap to not include any keys
+ this.size.set(DEEP_OVERHEAD);
+ this.snapshotAllocator = this.allocator;
+ // Reset allocator so we get a fresh buffer for the new memstore
+ if (allocator != null) {
+ this.allocator = new MemStoreLAB(conf, chunkPool);
+ }
+ timeOfOldestEdit = Long.MAX_VALUE;
+ }
+ }
+ return new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
+ this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator));
+ }
+
+ /**
+ * The passed snapshot was successfully persisted; it can be let go.
+ * @param id Id of the snapshot to clean out.
+ * @throws UnexpectedException
+ * @see #snapshot()
+ */
+ @Override
+ public void clearSnapshot(long id) throws UnexpectedException {
+ MemStoreLAB tmpAllocator = null;
+ if (this.snapshotId != id) {
+ throw new UnexpectedException("Current snapshot id is " + this.snapshotId + ",passed " + id);
+ }
+ // OK. Passed in snapshot is same as current snapshot. If not-empty,
+ // create a new snapshot and let the old one go.
+ if (!this.snapshot.isEmpty()) {
+ this.snapshot = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = new TimeRangeTracker();
+ }
+ this.snapshotSize = 0;
+ this.snapshotId = -1;
+ if (this.snapshotAllocator != null) {
+ tmpAllocator = this.snapshotAllocator;
+ this.snapshotAllocator = null;
+ }
+ if (tmpAllocator != null) {
+ tmpAllocator.close();
+ }
+ }
+
+ @Override
+ public long getFlushableSize() {
+ return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+ }
+
+ /**
+ * Write an update
+ * @param cell
+ * @return approximate size of the passed key and value.
+ */
+ @Override
+ public long add(Cell cell) {
+ KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell));
+ return internalAdd(toAdd);
+ }
+
+ @Override
+ public long timeOfOldestEdit() {
+ return timeOfOldestEdit;
+ }
+
+ private boolean addToKVSet(KeyValue e) {
+ boolean b = this.kvset.add(e);
+ setOldestEditTimeToNow();
+ return b;
+ }
+
+ private boolean removeFromKVSet(KeyValue e) {
+ boolean b = this.kvset.remove(e);
+ setOldestEditTimeToNow();
+ return b;
+ }
+
+ void setOldestEditTimeToNow() {
+ if (timeOfOldestEdit == Long.MAX_VALUE) {
+ timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ }
+
+ /**
+ * Internal version of add() that doesn't clone KVs with the
+ * allocator, and doesn't take the lock.
+ *
+ * Callers should ensure they already have the read lock taken
+ */
+ private long internalAdd(final KeyValue toAdd) {
+ long s = heapSizeChange(toAdd, addToKVSet(toAdd));
+ timeRangeTracker.includeTimestamp(toAdd);
+ this.size.addAndGet(s);
+ return s;
+ }
+
+ private KeyValue maybeCloneWithAllocator(KeyValue kv) {
+ if (allocator == null) {
+ return kv;
+ }
+
+ int len = kv.getLength();
+ Allocation alloc = allocator.allocateBytes(len);
+ if (alloc == null) {
+ // The allocation was too large, allocator decided
+ // not to do anything with it.
+ return kv;
+ }
+ assert alloc.getData() != null;
+ System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
+ KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
+ newKv.setMvccVersion(kv.getMvccVersion());
+ return newKv;
+ }
+
+ /**
+ * Remove n key from the memstore. Only kvs that have the same key and the
+ * same memstoreTS are removed. It is ok to not update timeRangeTracker
+ * in this call. It is possible that we can optimize this method by using
+ * tailMap/iterator, but since this method is called rarely (only for
+ * error recovery), we can leave those optimization for the future.
+ * @param cell
+ */
+ @Override
+ public void rollback(Cell cell) {
+ // If the key is in the snapshot, delete it. We should not update
+ // this.size, because that tracks the size of only the memstore and
+ // not the snapshot. The flush of this snapshot to disk has not
+ // yet started because Store.flush() waits for all rwcc transactions to
+ // commit before starting the flush to disk.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue found = this.snapshot.get(kv);
+ if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+ this.snapshot.remove(kv);
+ }
+ // If the key is in the memstore, delete it. Update this.size.
+ found = this.kvset.get(kv);
+ if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+ removeFromKVSet(kv);
+ long s = heapSizeChange(kv, true);
+ this.size.addAndGet(-s);
+ }
+ }
+
+ /**
+ * Write a delete
+ * @param deleteCell
+ * @return approximate size of the passed key and value.
+ */
+ @Override
+ public long delete(Cell deleteCell) {
+ long s = 0;
+ KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(deleteCell));
+ s += heapSizeChange(toAdd, addToKVSet(toAdd));
+ timeRangeTracker.includeTimestamp(toAdd);
+ this.size.addAndGet(s);
+ return s;
+ }
+
+ /**
+ * @param kv Find the row that comes after this one. If null, we return the
+ * first.
+ * @return Next row or null if none found.
+ */
+ KeyValue getNextRow(final KeyValue kv) {
+ return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
+ }
+
+ /*
+ * @param a
+ * @param b
+ * @return Return lowest of a or b or null if both a and b are null
+ */
+ private KeyValue getLowest(final KeyValue a, final KeyValue b) {
+ if (a == null) {
+ return b;
+ }
+ if (b == null) {
+ return a;
+ }
+ return comparator.compareRows(a, b) <= 0? a: b;
+ }
+
+ /*
+ * @param key Find row that follows this one. If null, return first.
+ * @param map Set to look in for a row beyond <code>row</code>.
+ * @return Next row or null if none found. If one found, will be a new
+ * KeyValue -- can be destroyed by subsequent calls to this method.
+ */
+ private KeyValue getNextRow(final KeyValue key,
+ final NavigableSet<KeyValue> set) {
+ KeyValue result = null;
+ SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
+ // Iterate until we fall into the next row; i.e. move off current row
+ for (KeyValue kv: tail) {
+ if (comparator.compareRows(kv, key) <= 0)
+ continue;
+ // Note: Not suppressing deletes or expired cells. Needs to be handled
+ // by higher up functions.
+ result = kv;
+ break;
+ }
+ return result;
+ }
+
+ /**
+ * @param state column/delete tracking state
+ */
+ @Override
+ public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
+ getRowKeyAtOrBefore(kvset, state);
+ getRowKeyAtOrBefore(snapshot, state);
+ }
+
+ /*
+ * @param set
+ * @param state Accumulates deletes and candidates.
+ */
+ private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
+ final GetClosestRowBeforeTracker state) {
+ if (set.isEmpty()) {
+ return;
+ }
+ if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
+ // Found nothing in row. Try backing up.
+ getRowKeyBefore(set, state);
+ }
+ }
+
+ /*
+ * Walk forward in a row from <code>firstOnRow</code>. Presumption is that
+ * we have been passed the first possible key on a row. As we walk forward
+ * we accumulate deletes until we hit a candidate on the row at which point
+ * we return.
+ * @param set
+ * @param firstOnRow First possible key on this row.
+ * @param state
+ * @return True if we found a candidate walking this row.
+ */
+ private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
+ final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
+ boolean foundCandidate = false;
+ SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
+ if (tail.isEmpty()) return foundCandidate;
+ for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
+ KeyValue kv = i.next();
+ // Did we go beyond the target row? If so break.
+ if (state.isTooFar(kv, firstOnRow)) break;
+ if (state.isExpired(kv)) {
+ i.remove();
+ continue;
+ }
+ // If we added something, this row is a contender. break.
+ if (state.handle(kv)) {
+ foundCandidate = true;
+ break;
+ }
+ }
+ return foundCandidate;
+ }
+
+ /*
+ * Walk backwards through the passed set a row at a time until we run out of
+ * set or until we get a candidate.
+ * @param set
+ * @param state
+ */
+ private void getRowKeyBefore(NavigableSet<KeyValue> set,
+ final GetClosestRowBeforeTracker state) {
+ KeyValue firstOnRow = state.getTargetKey();
+ for (Member p = memberOfPreviousRow(set, state, firstOnRow);
+ p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
+ // Make sure we don't fall out of our table.
+ if (!state.isTargetTable(p.kv)) break;
+ // Stop looking if we've exited the better candidate range.
+ if (!state.isBetterCandidate(p.kv)) break;
+ // Make into firstOnRow
+ firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
+ HConstants.LATEST_TIMESTAMP);
+ // If we find something, break;
+ if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
+ }
+ }
+
+ /**
+ * Only used by tests. TODO: Remove
+ *
+ * Given the specs of a column, update it, first by inserting a new record,
+ * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
+ * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+ * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+ * get the new value, or the old value and all readers will eventually only see the new
+ * value after the old was removed.
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param newValue
+ * @param now
+ * @return Timestamp
+ */
+ public long updateColumnValue(byte[] row,
+ byte[] family,
+ byte[] qualifier,
+ long newValue,
+ long now) {
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ row, family, qualifier);
+ // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+ if (!snSs.isEmpty()) {
+ KeyValue snKv = snSs.first();
+ // is there a matching KV in the snapshot?
+ if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+ if (snKv.getTimestamp() == now) {
+ // poop,
+ now += 1;
+ }
+ }
+ }
+
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+ // so we cant add the new KV w/o knowing what's there already, but we also
+ // want to take this chance to delete some kvs. So two loops (sad)
+
+ SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+ for (KeyValue kv : ss) {
+ // if this isnt the row we are interested in, then bail:
+ if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
+ now = kv.getTimestamp();
+ }
+ }
+
+ // create or update (upsert) a new KeyValue with
+ // 'now' and a 0 memstoreTS == immediately visible
+ List<Cell> cells = new ArrayList<Cell>(1);
+ cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+ return upsert(cells, 1L);
+ }
+
+ /**
+ * Update or insert the specified KeyValues.
+ * <p>
+ * For each KeyValue, insert into MemStore. This will atomically upsert the
+ * value for that row/family/qualifier. If a KeyValue did already exist,
+ * it will then be removed.
+ * <p>
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+ * be immediately visible. May want to change this so it is atomic across
+ * all KeyValues.
+ * <p>
+ * This is called under row lock, so Get operations will still see updates
+ * atomically. Scans will only see each KeyValue update as atomic.
+ *
+ * @param cells
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @return change in memstore size
+ */
+ @Override
+ public long upsert(Iterable<Cell> cells, long readpoint) {
+ long size = 0;
+ for (Cell cell : cells) {
+ size += upsert(cell, readpoint);
+ }
+ return size;
+ }
+
+ /**
+ * Inserts the specified KeyValue into MemStore and deletes any existing
+ * versions of the same row/family/qualifier as the specified KeyValue.
+ * <p>
+ * First, the specified KeyValue is inserted into the Memstore.
+ * <p>
+ * If there are any existing KeyValues in this MemStore with the same row,
+ * family, and qualifier, they are removed.
+ * <p>
+ * Callers must hold the read lock.
+ *
+ * @param cell
+ * @return change in size of MemStore
+ */
+ private long upsert(Cell cell, long readpoint) {
+ // Add the KeyValue to the MemStore
+ // Use the internalAdd method here since we (a) already have a lock
+ // and (b) cannot safely use the MSLAB here without potentially
+ // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+ // test that triggers the pathological case if we don't avoid MSLAB
+ // here.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ long addedSize = internalAdd(kv);
+
+ // Get the KeyValues for the row/family/qualifier regardless of timestamp.
+ // For this case we want to clean up any other puts
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
+ kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
+ SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+ Iterator<KeyValue> it = ss.iterator();
+ // versions visible to oldest scanner
+ int versionsVisible = 0;
+ while ( it.hasNext() ) {
+ KeyValue cur = it.next();
+
+ if (kv == cur) {
+ // ignore the one just put in
+ continue;
+ }
+ // check that this is the row and column we are interested in, otherwise bail
+ if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+ // only remove Puts that concurrent scanners cannot possibly see
+ if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ cur.getMvccVersion() <= readpoint) {
+ if (versionsVisible > 1) {
+ // if we get here we have seen at least one version visible to the oldest scanner,
+ // which means we can prove that no scanner will see this version
+
+ // false means there was a change, so give us the size.
+ long delta = heapSizeChange(cur, true);
+ addedSize -= delta;
+ this.size.addAndGet(-delta);
+ it.remove();
+ setOldestEditTimeToNow();
+ } else {
+ versionsVisible++;
+ }
+ }
+ } else {
+ // past the row or column, done
+ break;
+ }
+ }
+ return addedSize;
+ }
+
+ /*
+ * Immutable data structure to hold member found in set and the set it was
+ * found in. Include set because it is carrying context.
+ */
+ private static class Member {
+ final KeyValue kv;
+ final NavigableSet<KeyValue> set;
+ Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
+ this.kv = kv;
+ this.set = s;
+ }
+ }
+
+ /*
+ * @param set Set to walk back in. Pass a first in row or we'll return
+ * same row (loop).
+ * @param state Utility and context.
+ * @param firstOnRow First item on the row after the one we want to find a
+ * member in.
+ * @return Null or member of row previous to <code>firstOnRow</code>
+ */
+ private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
+ final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
+ NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
+ if (head.isEmpty()) return null;
+ for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
+ KeyValue found = i.next();
+ if (state.isExpired(found)) {
+ i.remove();
+ continue;
+ }
+ return new Member(head, found);
+ }
+ return null;
+ }
+
+ /**
+ * @return scanner on memstore and snapshot in this order.
+ */
+ @Override
+ public List<KeyValueScanner> getScanners(long readPt) {
+ return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
+ }
+
+ /**
+ * Check if this memstore may contain the required keys
+ * @param scan
+ * @return False if the key definitely does not exist in this Memstore
+ */
+ public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
+ return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+ snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
+ && (Math.max(timeRangeTracker.getMaximumTimestamp(),
+ snapshotTimeRangeTracker.getMaximumTimestamp()) >=
+ oldestUnexpiredTS);
+ }
+
+ /*
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner extends NonLazyKeyValueScanner {
+ // Next row information for either kvset or snapshot
+ private KeyValue kvsetNextRow = null;
+ private KeyValue snapshotNextRow = null;
+
+ // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+ private KeyValue kvsetItRow = null;
+ private KeyValue snapshotItRow = null;
+
+ // iterator based scanning.
+ private Iterator<KeyValue> kvsetIt;
+ private Iterator<KeyValue> snapshotIt;
+
+ // The kvset and snapshot at the time of creating this scanner
+ private KeyValueSkipListSet kvsetAtCreation;
+ private KeyValueSkipListSet snapshotAtCreation;
+
+ // the pre-calculated KeyValue to be returned by peek() or next()
+ private KeyValue theNext;
+
+ // The allocator and snapshot allocator at the time of creating this scanner
+ volatile MemStoreLAB allocatorAtCreation;
+ volatile MemStoreLAB snapshotAllocatorAtCreation;
+
+ // A flag represents whether could stop skipping KeyValues for MVCC
+ // if have encountered the next row. Only used for reversed scan
+ private boolean stopSkippingKVsIfNextRow = false;
+
+ private long readPoint;
+
+ /*
+ Some notes...
+
+ So memstorescanner is fixed at creation time. this includes pointers/iterators into
+ existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
+ snapshot is moved. since kvset is null there is no point on reseeking on both,
+ we can save us the trouble. During the snapshot->hfile transition, the memstore
+ scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
+ potentially do something smarter by adjusting the existing memstore scanner.
+
+ But there is a greater problem here, that being once a scanner has progressed
+ during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+ if a scan lasts a little while, there is a chance for new entries in kvset to
+ become available but we will never see them. This needs to be handled at the
+ StoreScanner level with coordination with MemStoreScanner.
+
+ Currently, this problem is only partly managed: during the small amount of time
+ when the StoreScanner has not yet created a new MemStoreScanner, we will miss
+ the adds to kvset in the MemStoreScanner.
+ */
+
+ MemStoreScanner(long readPoint) {
+ super();
+
+ this.readPoint = readPoint;
+ kvsetAtCreation = kvset;
+ snapshotAtCreation = snapshot;
+ if (allocator != null) {
+ this.allocatorAtCreation = allocator;
+ this.allocatorAtCreation.incScannerCount();
+ }
+ if (snapshotAllocator != null) {
+ this.snapshotAllocatorAtCreation = snapshotAllocator;
+ this.snapshotAllocatorAtCreation.incScannerCount();
+ }
+ }
+
+ private KeyValue getNext(Iterator<KeyValue> it) {
+ KeyValue startKV = theNext;
+ KeyValue v = null;
+ try {
+ while (it.hasNext()) {
+ v = it.next();
+ if (v.getMvccVersion() <= this.readPoint) {
+ return v;
+ }
+ if (stopSkippingKVsIfNextRow && startKV != null
+ && comparator.compareRows(v, startKV) > 0) {
+ return null;
+ }
+ }
+
+ return null;
+ } finally {
+ if (v != null) {
+ // in all cases, remember the last KV iterated to
+ if (it == snapshotIt) {
+ snapshotItRow = v;
+ } else {
+ kvsetItRow = v;
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the scanner at the seek key.
+ * Must be called only once: there is no thread safety between the scanner
+ * and the memStore.
+ * @param key seek value
+ * @return false if the key is null or if there is no data
+ */
+ @Override
+ public synchronized boolean seek(KeyValue key) {
+ if (key == null) {
+ close();
+ return false;
+ }
+
+ // kvset and snapshot will never be null.
+ // if tailSet can't find anything, SortedSet is empty (not null).
+ kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+ kvsetItRow = null;
+ snapshotItRow = null;
+
+ return seekInSubLists(key);
+ }
+
+
+ /**
+ * (Re)initialize the iterators after a seek or a reseek.
+ */
+ private synchronized boolean seekInSubLists(KeyValue key){
+ kvsetNextRow = getNext(kvsetIt);
+ snapshotNextRow = getNext(snapshotIt);
+
+ // Calculate the next value
+ theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+ // has data
+ return (theNext != null);
+ }
+
+
+ /**
+ * Move forward on the sub-lists set previously by seek.
+ * @param key seek value (should be non-null)
+ * @return true if there is at least one KV to read, false otherwise
+ */
+ @Override
+ public synchronized boolean reseek(KeyValue key) {
+ /*
+ See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+ This code is executed concurrently with flush and puts, without locks.
+ Two points must be known when working on this code:
+ 1) It's not possible to use the 'kvTail' and 'snapshot'
+ variables, as they are modified during a flush.
+ 2) The ideal implementation for performance would use the sub skip list
+ implicitly pointed by the iterators 'kvsetIt' and
+ 'snapshotIt'. Unfortunately the Java API does not offer a method to
+ get it. So we remember the last keys we iterated to and restore
+ the reseeked set to at least that point.
+ */
+
+ kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
+
+ return seekInSubLists(key);
+ }
+
+
+ @Override
+ public synchronized KeyValue peek() {
+ //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+ return theNext;
+ }
+
+ @Override
+ public synchronized KeyValue next() {
+ if (theNext == null) {
+ return null;
+ }
+
+ final KeyValue ret = theNext;
+
+ // Advance one of the iterators
+ if (theNext == kvsetNextRow) {
+ kvsetNextRow = getNext(kvsetIt);
+ } else {
+ snapshotNextRow = getNext(snapshotIt);
+ }
+
+ // Calculate the next value
+ theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+ //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+ // getLowest() + " threadpoint=" + readpoint);
+ return ret;
+ }
+
+ /*
+ * Returns the lower of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ private KeyValue getLowest(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare <= 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ /*
+ * Returns the higher of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ private KeyValue getHighest(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare > 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ public synchronized void close() {
+ this.kvsetNextRow = null;
+ this.snapshotNextRow = null;
+
+ this.kvsetIt = null;
+ this.snapshotIt = null;
+
+ if (allocatorAtCreation != null) {
+ this.allocatorAtCreation.decScannerCount();
+ this.allocatorAtCreation = null;
+ }
+ if (snapshotAllocatorAtCreation != null) {
+ this.snapshotAllocatorAtCreation.decScannerCount();
+ this.snapshotAllocatorAtCreation = null;
+ }
+
+ this.kvsetItRow = null;
+ this.snapshotItRow = null;
+ }
+
+ /**
+ * MemStoreScanner returns max value as sequence id because it will
+ * always have the latest data among all files.
+ */
+ @Override
+ public long getSequenceID() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
+ long oldestUnexpiredTS) {
+ return shouldSeek(scan, oldestUnexpiredTS);
+ }
+
+ /**
+ * Seek scanner to the given key first. If it returns false(means
+ * peek()==null) or scanner's peek row is bigger than row of given key, seek
+ * the scanner to the previous row of given key
+ */
+ @Override
+ public synchronized boolean backwardSeek(KeyValue key) {
+ seek(key);
+ if (peek() == null || comparator.compareRows(peek(), key) > 0) {
+ return seekToPreviousRow(key);
+ }
+ return true;
+ }
+
+ /**
+ * Separately get the KeyValue before the specified key from kvset and
+ * snapshotset, and use the row of higher one as the previous row of
+ * specified key, then seek to the first KeyValue of previous row
+ */
+ @Override
+ public synchronized boolean seekToPreviousRow(KeyValue key) {
+ KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
+ SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
+ KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
+ SortedSet<KeyValue> snapshotHead = snapshotAtCreation
+ .headSet(firstKeyOnRow);
+ KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
+ .last();
+ KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
+ if (lastKVBeforeRow == null) {
+ theNext = null;
+ return false;
+ }
+ KeyValue firstKeyOnPreviousRow = KeyValue
+ .createFirstOnRow(lastKVBeforeRow.getRow());
+ this.stopSkippingKVsIfNextRow = true;
+ seek(firstKeyOnPreviousRow);
+ this.stopSkippingKVsIfNextRow = false;
+ if (peek() == null
+ || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
+ return seekToPreviousRow(lastKVBeforeRow);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized boolean seekToLastRow() {
+ KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
+ .last();
+ KeyValue second = snapshotAtCreation.isEmpty() ? null
+ : snapshotAtCreation.last();
+ KeyValue higherKv = getHighest(first, second);
+ if (higherKv == null) {
+ return false;
+ }
+ KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
+ if (seek(firstKvOnLastRow)) {
+ return true;
+ } else {
+ return seekToPreviousRow(higherKv);
+ }
+
+ }
+ }
+
+ public final static long FIXED_OVERHEAD = ClassSize.align(
+ ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
+
+ public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+ ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
+ (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
+
+ /*
+ * Calculate how the MemStore size has changed. Includes overhead of the
+ * backing Map.
+ * @param kv
+ * @param notpresent True if the kv was NOT present in the set.
+ * @return Size
+ */
+ static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
+ return notpresent ?
+ ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
+ 0;
+ }
+
+ private long keySize() {
+ return heapSize() - DEEP_OVERHEAD;
+ }
+
+ /**
+ * Get the entire heap usage for this MemStore not including keys in the
+ * snapshot.
+ */
+ @Override
+ public long heapSize() {
+ return size.get();
+ }
+
+ @Override
+ public long size() {
+ return heapSize();
+ }
+
+ /**
+ * Code to help figure if our approximation of object heap sizes is close
+ * enough. See hbase-900. Fills memstores then waits so user can heap
+ * dump and bring up resultant hprof in something like jprofiler which
+ * allows you get 'deep size' on objects.
+ * @param args main args
+ */
+ public static void main(String [] args) {
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
+ runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
+ LOG.info("vmInputArguments=" + runtime.getInputArguments());
+ DefaultMemStore memstore1 = new DefaultMemStore();
+ // TODO: x32 vs x64
+ long size = 0;
+ final int count = 10000;
+ byte [] fam = Bytes.toBytes("col");
+ byte [] qf = Bytes.toBytes("umn");
+ byte [] empty = new byte[0];
+ for (int i = 0; i < count; i++) {
+ // Give each its own ts
+ size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ }
+ LOG.info("memstore1 estimated size=" + size);
+ for (int i = 0; i < count; i++) {
+ size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ }
+ LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
+ // Make a variably sized memstore.
+ DefaultMemStore memstore2 = new DefaultMemStore();
+ for (int i = 0; i < count; i++) {
+ size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
+ new byte[i]));
+ }
+ LOG.info("memstore2 estimated size=" + size);
+ final int seconds = 30;
+ LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
+ for (int i = 0; i < seconds; i++) {
+ // Thread.sleep(1000);
+ }
+ LOG.info("Exiting.");
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -21,16 +21,12 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.util.StringUtils;
/**
@@ -45,21 +41,20 @@ public class DefaultStoreFlusher extends
}
@Override
- public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
+ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
- if (snapshot.size() == 0) return result; // don't flush if there are no entries
+ int cellsCount = snapshot.getCellsCount();
+ if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
- long flushed = 0;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
@@ -67,20 +62,19 @@ public class DefaultStoreFlusher extends
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(
- snapshot.size(), store.getFamily().getCompression(), false, true, true);
- writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+ cellsCount, store.getFamily().getCompression(), false, true, true);
+ writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
try {
- flushed = performFlush(scanner, writer, smallestReadPoint);
+ performFlush(scanner, writer, smallestReadPoint);
} finally {
finalizeWriter(writer, cacheFlushId, status);
}
}
} finally {
- flushedSize.set(flushed);
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
- + StringUtils.humanReadableInt(flushed) +
+ + StringUtils.humanReadableInt(snapshot.getSize()) +
", hasBloomFilter=" + writer.hasGeneralBloom() +
", into tmp file " + writer.getPath());
result.add(writer.getPath());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Mar 14 14:07:10 2014
@@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
-import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +38,6 @@ import java.util.concurrent.ExecutorComp
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -83,6 +81,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -116,6 +115,7 @@ import com.google.common.collect.Lists;
*/
@InterfaceAudience.Private
public class HStore implements Store {
+ private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
@@ -224,7 +224,9 @@ public class HStore implements Store {
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
- this.memstore = new MemStore(conf, this.comparator);
+ String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
+ this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+ Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
@@ -752,7 +754,7 @@ public class HStore implements Store {
/**
* Snapshot this stores memstore. Call before running
- * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
+ * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
* so it has some work to do.
*/
void snapshot() {
@@ -769,16 +771,11 @@ public class HStore implements Store {
* previously.
* @param logCacheFlushId flush sequence number
* @param snapshot
- * @param snapshotTimeRangeTracker
- * @param flushedSize The number of bytes flushed
* @param status
* @return The path name of the tmp file to which the store was flushed
* @throws IOException
*/
- protected List<Path> flushCache(final long logCacheFlushId,
- SortedSet<KeyValue> snapshot,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
+ protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
@@ -789,8 +786,7 @@ public class HStore implements Store {
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
- List<Path> pathNames = flusher.flushSnapshot(
- snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+ List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@@ -826,14 +822,11 @@ public class HStore implements Store {
/*
* @param path The pathname of the tmp file into which the store was flushed
* @param logCacheFlushId
+ * @param status
* @return StoreFile created.
* @throws IOException
*/
- private StoreFile commitFile(final Path path,
- final long logCacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status)
+ private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
throws IOException {
// Write-out finished successfully, move into the right spot
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
@@ -916,16 +909,16 @@ public class HStore implements Store {
/*
* Change storeFiles adding into place the Reader produced by this new flush.
* @param sfs Store files
- * @param set That was used to make the passed file.
+ * @param snapshotId
* @throws IOException
* @return Whether compaction is required.
*/
- private boolean updateStorefiles(
- final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
+ private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
+ throws IOException {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
- this.memstore.clearSnapshot(set);
+ this.memstore.clearSnapshot(snapshotId);
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -1827,7 +1820,7 @@ public class HStore implements Store {
@Override
public long getMemStoreSize() {
- return this.memstore.heapSize();
+ return this.memstore.size();
}
@Override
@@ -1918,10 +1911,8 @@ public class HStore implements Store {
private class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum;
- private SortedSet<KeyValue> snapshot;
+ private MemStoreSnapshot snapshot;
private List<Path> tempFiles;
- private TimeRangeTracker snapshotTimeRangeTracker;
- private final AtomicLong flushedSize = new AtomicLong();
private StoreFlusherImpl(long cacheFlushSeqNum) {
this.cacheFlushSeqNum = cacheFlushSeqNum;
@@ -1933,15 +1924,12 @@ public class HStore implements Store {
*/
@Override
public void prepare() {
- memstore.snapshot();
- this.snapshot = memstore.getSnapshot();
- this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
+ this.snapshot = memstore.snapshot();
}
@Override
public void flushCache(MonitoredTask status) throws IOException {
- tempFiles = HStore.this.flushCache(
- cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
+ tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
}
@Override
@@ -1952,8 +1940,7 @@ public class HStore implements Store {
List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
for (Path storeFilePath : tempFiles) {
try {
- storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
- snapshotTimeRangeTracker, flushedSize, status));
+ storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
} catch (IOException ex) {
LOG.error("Failed to commit store file " + storeFilePath, ex);
// Try to delete the files we have committed before.
@@ -1976,7 +1963,7 @@ public class HStore implements Store {
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
- return HStore.this.updateStorefiles(storeFiles, snapshot);
+ return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
}
}