You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/02/11 19:39:11 UTC
[2/2] hbase git commit: HBASE-14919 Refactoring for in-memory flush
and compaction
HBASE-14919 Refactoring for in-memory flush and compaction
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25dfc112
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25dfc112
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25dfc112
Branch: refs/heads/master
Commit: 25dfc112dd76134a9a3ce1f2e88c4075aef76557
Parents: a975408
Author: eshcar <es...@yahoo-inc.com>
Authored: Mon Feb 8 23:35:02 2016 +0200
Committer: stack <st...@apache.org>
Committed: Thu Feb 11 10:39:01 2016 -0800
----------------------------------------------------------------------
.../hbase/regionserver/AbstractMemStore.java | 497 +++++++++++
.../hadoop/hbase/regionserver/CellSet.java | 183 ++++
.../hbase/regionserver/CellSkipListSet.java | 185 ----
.../hbase/regionserver/DefaultMemStore.java | 859 +------------------
.../hadoop/hbase/regionserver/HStore.java | 22 +-
.../hbase/regionserver/ImmutableSegment.java | 72 ++
.../regionserver/ImmutableSegmentAdapter.java | 107 +++
.../hadoop/hbase/regionserver/MemStore.java | 16 +-
.../hbase/regionserver/MemStoreScanner.java | 348 ++++++++
.../hbase/regionserver/MemStoreSnapshot.java | 13 +-
.../regionserver/MutableCellSetSegment.java | 153 ++++
.../MutableCellSetSegmentScanner.java | 258 ++++++
.../hbase/regionserver/MutableSegment.java | 57 ++
.../hadoop/hbase/regionserver/Segment.java | 218 +++++
.../hbase/regionserver/SegmentFactory.java | 89 ++
.../hbase/regionserver/SegmentScanner.java | 152 ++++
.../hbase/regionserver/StoreFlushContext.java | 2 +-
.../apache/hadoop/hbase/io/TestHeapSize.java | 49 +-
.../hbase/regionserver/TestCellSkipListSet.java | 13 +-
.../hbase/regionserver/TestDefaultMemStore.java | 133 +--
.../hbase/regionserver/TestHMobStore.java | 29 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 150 ++--
.../regionserver/TestMemStoreChunkPool.java | 29 +-
.../hadoop/hbase/regionserver/TestStore.java | 11 +-
24 files changed, 2380 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
new file mode 100644
index 0000000..18d2f8a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -0,0 +1,497 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * An abstract class, which implements the behaviour shared by all concrete memstore instances.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMemStore implements MemStore {
+
+ private static final long NO_SNAPSHOT_ID = -1;
+
+ private final Configuration conf;
+ private final CellComparator comparator;
+
+ // active segment absorbs write operations
+ private volatile MutableSegment active;
+ // Snapshot of memstore. Made for flusher.
+ private volatile ImmutableSegment snapshot;
+ protected volatile long snapshotId;
+ // Used to track when to flush
+ private volatile long timeOfOldestEdit;
+
+ public final static long FIXED_OVERHEAD = ClassSize.align(
+ ClassSize.OBJECT +
+ (4 * ClassSize.REFERENCE) +
+ (2 * Bytes.SIZEOF_LONG));
+
+ public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+ 2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
+ ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
+
+
+ protected AbstractMemStore(final Configuration conf, final CellComparator c) {
+ this.conf = conf;
+ this.comparator = c;
+ resetCellSet();
+ this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0);
+ this.snapshotId = NO_SNAPSHOT_ID;
+ }
+
+ protected void resetCellSet() {
+ // Reset heap to not include any keys
+ this.active = SegmentFactory.instance().createMutableSegment(
+ conf, comparator, DEEP_OVERHEAD);
+ this.timeOfOldestEdit = Long.MAX_VALUE;
+ }
+
+ /*
+ * Calculate how the MemStore size has changed. Includes overhead of the
+ * backing Map.
+ * @param cell
+ * @param notPresent True if the cell was NOT present in the set.
+ * @return change in size
+ */
+ static long heapSizeChange(final Cell cell, final boolean notPresent) {
+ return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
+ + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+ }
+
+ /**
+ * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
+ * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
+ * only if it is greater than the previous sequence id
+ */
+ public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent);
+
+ /**
+ * Write an update
+ * @param cell the cell to be added
+ * @return approximate size of the passed cell & newly added cell which maybe different than the
+ * passed-in cell
+ */
+ @Override
+ public long add(Cell cell) {
+ Cell toAdd = maybeCloneWithAllocator(cell);
+ return internalAdd(toAdd);
+ }
+
+ /**
+ * Update or insert the specified Cells.
+ * <p>
+ * For each Cell, insert into MemStore. This will atomically upsert the
+ * value for that row/family/qualifier. If a Cell did already exist,
+ * it will then be removed.
+ * <p>
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+ * be immediately visible. May want to change this so it is atomic across
+ * all Cells.
+ * <p>
+ * This is called under row lock, so Get operations will still see updates
+ * atomically. Scans will only see each Cell update as atomic.
+ *
+ * @param cells the cells to be updated
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @return change in memstore size
+ */
+ @Override
+ public long upsert(Iterable<Cell> cells, long readpoint) {
+ long size = 0;
+ for (Cell cell : cells) {
+ size += upsert(cell, readpoint);
+ }
+ return size;
+ }
+
+ /**
+ * @return Oldest timestamp of all the Cells in the MemStore
+ */
+ @Override
+ public long timeOfOldestEdit() {
+ return timeOfOldestEdit;
+ }
+
+
+ /**
+ * Write a delete
+ * @param deleteCell the cell to be deleted
+ * @return approximate size of the passed key and value.
+ */
+ @Override
+ public long delete(Cell deleteCell) {
+ Cell toAdd = maybeCloneWithAllocator(deleteCell);
+ long s = internalAdd(toAdd);
+ return s;
+ }
+
+ /**
+ * An override on snapshot so the no arg version of the method implies zero seq num,
+ * like for cases without wal
+ */
+ public MemStoreSnapshot snapshot() {
+ return snapshot(0);
+ }
+
+ /**
+ * The passed snapshot was successfully persisted; it can be let go.
+ * @param id Id of the snapshot to clean out.
+ * @see MemStore#snapshot(long)
+ */
+ @Override
+ public void clearSnapshot(long id) throws UnexpectedStateException {
+ if (this.snapshotId != id) {
+ throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+ + id);
+ }
+ // OK. Passed in snapshot is same as current snapshot. If not-empty,
+ // create a new snapshot and let the old one go.
+ Segment oldSnapshot = this.snapshot;
+ if (!this.snapshot.isEmpty()) {
+ this.snapshot = SegmentFactory.instance().createImmutableSegment(
+ getComparator(), 0);
+ }
+ this.snapshotId = NO_SNAPSHOT_ID;
+ oldSnapshot.close();
+ }
+
+ /**
+ * Get the entire heap usage for this MemStore not including keys in the
+ * snapshot.
+ */
+ @Override
+ public long heapSize() {
+ return getActive().getSize();
+ }
+
+ /**
+ * On flush, how much memory we will clear from the active cell set.
+ *
+ * @return size of data that is going to be flushed from active set
+ */
+ @Override
+ public long getFlushableSize() {
+ long snapshotSize = getSnapshot().getSize();
+ return snapshotSize > 0 ? snapshotSize : keySize();
+ }
+
+
+ /**
+ * @return a list containing a single memstore scanner.
+ */
+ @Override
+ public List<KeyValueScanner> getScanners(long readPt) throws IOException {
+ return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt));
+ }
+
+ @Override
+ public long getSnapshotSize() {
+ return getSnapshot().getSize();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ int i = 1;
+ try {
+ for (Segment segment : getListOfSegments()) {
+ buf.append("Segment (" + i + ") " + segment.toString() + "; ");
+ i++;
+ }
+ } catch (IOException e){
+ return e.toString();
+ }
+ return buf.toString();
+ }
+
+ protected void rollbackInSnapshot(Cell cell) {
+ // If the key is in the snapshot, delete it. We should not update
+ // this.size, because that tracks the size of only the memstore and
+ // not the snapshot. The flush of this snapshot to disk has not
+ // yet started because Store.flush() waits for all rwcc transactions to
+ // commit before starting the flush to disk.
+ snapshot.rollback(cell);
+ }
+
+ protected void rollbackInActive(Cell cell) {
+ // If the key is in the memstore, delete it. Update this.size.
+ long sz = active.rollback(cell);
+ if (sz != 0) {
+ setOldestEditTimeToNow();
+ }
+ }
+
+ protected Configuration getConfiguration() {
+ return conf;
+ }
+
+ protected void dump(Log log) {
+ active.dump(log);
+ snapshot.dump(log);
+ }
+
+
+ /**
+ * Inserts the specified Cell into MemStore and deletes any existing
+ * versions of the same row/family/qualifier as the specified Cell.
+ * <p>
+ * First, the specified Cell is inserted into the Memstore.
+ * <p>
+ * If there are any existing Cell in this MemStore with the same row,
+ * family, and qualifier, they are removed.
+ * <p>
+ * Callers must hold the read lock.
+ *
+ * @param cell the cell to be updated
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @return change in size of MemStore
+ */
+ private long upsert(Cell cell, long readpoint) {
+ // Add the Cell to the MemStore
+ // Use the internalAdd method here since we (a) already have a lock
+ // and (b) cannot safely use the MSLAB here without potentially
+ // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+ // test that triggers the pathological case if we don't avoid MSLAB
+ // here.
+ long addedSize = internalAdd(cell);
+
+ // Get the Cells for the row/family/qualifier regardless of timestamp.
+ // For this case we want to clean up any other puts
+ Cell firstCell = KeyValueUtil.createFirstOnRow(
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ SortedSet<Cell> ss = active.tailSet(firstCell);
+ Iterator<Cell> it = ss.iterator();
+ // versions visible to oldest scanner
+ int versionsVisible = 0;
+ while (it.hasNext()) {
+ Cell cur = it.next();
+
+ if (cell == cur) {
+ // ignore the one just put in
+ continue;
+ }
+ // check that this is the row and column we are interested in, otherwise bail
+ if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
+ // only remove Puts that concurrent scanners cannot possibly see
+ if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ cur.getSequenceId() <= readpoint) {
+ if (versionsVisible >= 1) {
+ // if we get here we have seen at least one version visible to the oldest scanner,
+ // which means we can prove that no scanner will see this version
+
+ // false means there was a change, so give us the size.
+ long delta = heapSizeChange(cur, true);
+ addedSize -= delta;
+ active.incSize(-delta);
+ it.remove();
+ setOldestEditTimeToNow();
+ } else {
+ versionsVisible++;
+ }
+ }
+ } else {
+ // past the row or column, done
+ break;
+ }
+ }
+ return addedSize;
+ }
+
+ /*
+ * @param a
+ * @param b
+ * @return Return lowest of a or b or null if both a and b are null
+ */
+ protected Cell getLowest(final Cell a, final Cell b) {
+ if (a == null) {
+ return b;
+ }
+ if (b == null) {
+ return a;
+ }
+ return comparator.compareRows(a, b) <= 0? a: b;
+ }
+
+ /*
+ * @param key Find row that follows this one. If null, return first.
+ * @param set Set to look in for a row beyond <code>row</code>.
+ * @return Next row or null if none found. If one found, will be a new
+ * KeyValue -- can be destroyed by subsequent calls to this method.
+ */
+ protected Cell getNextRow(final Cell key,
+ final NavigableSet<Cell> set) {
+ Cell result = null;
+ SortedSet<Cell> tail = key == null? set: set.tailSet(key);
+ // Iterate until we fall into the next row; i.e. move off current row
+ for (Cell cell: tail) {
+ if (comparator.compareRows(cell, key) <= 0) {
+ continue;
+ }
+ // Note: Not suppressing deletes or expired cells. Needs to be handled
+ // by higher up functions.
+ result = cell;
+ break;
+ }
+ return result;
+ }
+
+ /**
+ * Given the specs of a column, update it, first by inserting a new record,
+ * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
+ * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+ * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+ * get the new value, or the old value and all readers will eventually only see the new
+ * value after the old was removed.
+ */
+ @VisibleForTesting
+ @Override
+ public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
+ long newValue, long now) {
+ Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
+ // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ Cell snc = snapshot.getFirstAfter(firstCell);
+ if(snc != null) {
+ // is there a matching Cell in the snapshot?
+ if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
+ if (snc.getTimestamp() == now) {
+ now += 1;
+ }
+ }
+ }
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+ // so we cant add the new Cell w/o knowing what's there already, but we also
+ // want to take this chance to delete some cells. So two loops (sad)
+
+ SortedSet<Cell> ss = getActive().tailSet(firstCell);
+ for (Cell cell : ss) {
+ // if this isnt the row we are interested in, then bail:
+ if (!CellUtil.matchingColumn(cell, family, qualifier)
+ || !CellUtil.matchingRow(cell, firstCell)) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the active.
+ if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
+ now = cell.getTimestamp();
+ }
+ }
+
+ // create or update (upsert) a new Cell with
+ // 'now' and a 0 memstoreTS == immediately visible
+ List<Cell> cells = new ArrayList<Cell>(1);
+ cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+ return upsert(cells, 1L);
+ }
+
+ private Cell maybeCloneWithAllocator(Cell cell) {
+ return active.maybeCloneWithAllocator(cell);
+ }
+
+ /**
+ * Internal version of add() that doesn't clone Cells with the
+ * allocator, and doesn't take the lock.
+ *
+ * Callers should ensure they already have the read lock taken
+ */
+ private long internalAdd(final Cell toAdd) {
+ long s = active.add(toAdd);
+ setOldestEditTimeToNow();
+ checkActiveSize();
+ return s;
+ }
+
+ private void setOldestEditTimeToNow() {
+ if (timeOfOldestEdit == Long.MAX_VALUE) {
+ timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
+ }
+ }
+
+ protected long keySize() {
+ return heapSize() - DEEP_OVERHEAD;
+ }
+
+ protected CellComparator getComparator() {
+ return comparator;
+ }
+
+ protected MutableSegment getActive() {
+ return active;
+ }
+
+ protected ImmutableSegment getSnapshot() {
+ return snapshot;
+ }
+
+ protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
+ this.snapshot = snapshot;
+ return this;
+ }
+
+ protected void setSnapshotSize(long snapshotSize) {
+ getSnapshot().setSize(snapshotSize);
+ }
+
+ /**
+ * Check whether anything need to be done based on the current active set size
+ */
+ protected abstract void checkActiveSize();
+
+ /**
+ * Returns a list of Store segment scanners, one per each store segment
+ * @param readPt the version number required to initialize the scanners
+ * @return a list of Store segment scanners, one per each store segment
+ */
+ protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException;
+
+ /**
+ * Returns an ordered list of segments from most recent to oldest in memstore
+ * @return an ordered list of segments from most recent to oldest in memstore
+ */
+ protected abstract List<Segment> getListOfSegments() throws IOException;
+
+ public long getActiveSize() {
+ return getActive().getSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
new file mode 100644
index 0000000..4433302
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -0,0 +1,183 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already
+ * exists in the set. The call to add returns true if no value in the backing map or false if
+ * there was an entry with same key (though value may be different).
+ * implementation is tolerant of concurrent get and set and won't throw
+ * ConcurrentModificationException when iterating.
+ */
+@InterfaceAudience.Private
+public class CellSet implements NavigableSet<Cell> {
+ // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
+ // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
+ // is not already present.", this implementation "Adds the specified element to this set EVEN
+ // if it is already present overwriting what was there previous".
+ // Otherwise, has same attributes as ConcurrentSkipListSet
+ private final ConcurrentNavigableMap<Cell, Cell> delegatee;
+
+ CellSet(final CellComparator c) {
+ this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
+ }
+
+ CellSet(final ConcurrentNavigableMap<Cell, Cell> m) {
+ this.delegatee = m;
+ }
+
+ public Cell ceiling(Cell e) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Iterator<Cell> descendingIterator() {
+ return this.delegatee.descendingMap().values().iterator();
+ }
+
+ public NavigableSet<Cell> descendingSet() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Cell floor(Cell e) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public SortedSet<Cell> headSet(final Cell toElement) {
+ return headSet(toElement, false);
+ }
+
+ public NavigableSet<Cell> headSet(final Cell toElement,
+ boolean inclusive) {
+ return new CellSet(this.delegatee.headMap(toElement, inclusive));
+ }
+
+ public Cell higher(Cell e) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Iterator<Cell> iterator() {
+ return this.delegatee.values().iterator();
+ }
+
+ public Cell lower(Cell e) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Cell pollFirst() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Cell pollLast() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public NavigableSet<Cell> subSet(Cell fromElement,
+ boolean fromInclusive, Cell toElement, boolean toInclusive) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public SortedSet<Cell> tailSet(Cell fromElement) {
+ return tailSet(fromElement, true);
+ }
+
+ public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
+ return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
+ }
+
+ public Comparator<? super Cell> comparator() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Cell first() {
+ return this.delegatee.get(this.delegatee.firstKey());
+ }
+
+ public Cell last() {
+ return this.delegatee.get(this.delegatee.lastKey());
+ }
+
+ public boolean add(Cell e) {
+ return this.delegatee.put(e, e) == null;
+ }
+
+ public boolean addAll(Collection<? extends Cell> c) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public void clear() {
+ this.delegatee.clear();
+ }
+
+ public boolean contains(Object o) {
+ //noinspection SuspiciousMethodCalls
+ return this.delegatee.containsKey(o);
+ }
+
+ public boolean containsAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public boolean isEmpty() {
+ return this.delegatee.isEmpty();
+ }
+
+ public boolean remove(Object o) {
+ return this.delegatee.remove(o) != null;
+ }
+
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public Cell get(Cell kv) {
+ return this.delegatee.get(kv);
+ }
+
+ public int size() {
+ return this.delegatee.size();
+ }
+
+ public Object[] toArray() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
deleted file mode 100644
index e9941b3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A {@link java.util.Set} of {@link Cell}s implemented on top of a
- * {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a
- * {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard:
- * An add will overwrite if already an entry for the added key. In other words,
- * where CSLS does "Adds the specified element to this set if it is not already
- * present.", this implementation "Adds the specified element to this set EVEN
- * if it is already present overwriting what was there previous". The call to
- * add returns true if no value in the backing map or false if there was an
- * entry with same key (though value may be different).
- * <p>Otherwise,
- * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
- * get and set and won't throw ConcurrentModificationException when iterating.
- */
-@InterfaceAudience.Private
-public class CellSkipListSet implements NavigableSet<Cell> {
- private final ConcurrentNavigableMap<Cell, Cell> delegatee;
-
- CellSkipListSet(final CellComparator c) {
- this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c);
- }
-
- CellSkipListSet(final ConcurrentNavigableMap<Cell, Cell> m) {
- this.delegatee = m;
- }
-
- public Cell ceiling(Cell e) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Iterator<Cell> descendingIterator() {
- return this.delegatee.descendingMap().values().iterator();
- }
-
- public NavigableSet<Cell> descendingSet() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Cell floor(Cell e) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public SortedSet<Cell> headSet(final Cell toElement) {
- return headSet(toElement, false);
- }
-
- public NavigableSet<Cell> headSet(final Cell toElement,
- boolean inclusive) {
- return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive));
- }
-
- public Cell higher(Cell e) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Iterator<Cell> iterator() {
- return this.delegatee.values().iterator();
- }
-
- public Cell lower(Cell e) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Cell pollFirst() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Cell pollLast() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public NavigableSet<Cell> subSet(Cell fromElement,
- boolean fromInclusive, Cell toElement, boolean toInclusive) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public SortedSet<Cell> tailSet(Cell fromElement) {
- return tailSet(fromElement, true);
- }
-
- public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
- return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive));
- }
-
- public Comparator<? super Cell> comparator() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Cell first() {
- return this.delegatee.get(this.delegatee.firstKey());
- }
-
- public Cell last() {
- return this.delegatee.get(this.delegatee.lastKey());
- }
-
- public boolean add(Cell e) {
- return this.delegatee.put(e, e) == null;
- }
-
- public boolean addAll(Collection<? extends Cell> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public void clear() {
- this.delegatee.clear();
- }
-
- public boolean contains(Object o) {
- //noinspection SuspiciousMethodCalls
- return this.delegatee.containsKey(o);
- }
-
- public boolean containsAll(Collection<?> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public boolean isEmpty() {
- return this.delegatee.isEmpty();
- }
-
- public boolean remove(Object o) {
- return this.delegatee.remove(o) != null;
- }
-
- public boolean removeAll(Collection<?> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public boolean retainAll(Collection<?> c) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public Cell get(Cell kv) {
- return this.delegatee.get(kv);
- }
-
- public int size() {
- return this.delegatee.size();
- }
-
- public Object[] toArray() {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- public <T> T[] toArray(T[] a) {
- throw new UnsupportedOperationException("Not implemented");
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index f61d871..82d40b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -19,35 +19,22 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.htrace.Trace;
/**
* The MemStore holds in-memory modifications to the Store. Modifications
@@ -66,40 +53,8 @@ import org.apache.htrace.Trace;
* in KV size.
*/
@InterfaceAudience.Private
-public class DefaultMemStore implements MemStore {
+public class DefaultMemStore extends AbstractMemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
- static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
- private static final boolean USEMSLAB_DEFAULT = true;
- static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
-
- private Configuration conf;
-
- // MemStore. Use a CellSkipListSet rather than SkipListSet because of the
- // better semantics. The Map will overwrite if passed a key it already had
- // whereas the Set will not add new Cell if key is same though value might be
- // different. Value is not important -- just make sure always same
- // reference passed.
- volatile CellSkipListSet cellSet;
-
- // Snapshot of memstore. Made for flusher.
- volatile CellSkipListSet snapshot;
-
- final CellComparator comparator;
-
- // Used to track own heapSize
- final AtomicLong size;
- private volatile long snapshotSize;
-
- // Used to track when to flush
- volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
- TimeRangeTracker timeRangeTracker;
- TimeRangeTracker snapshotTimeRangeTracker;
-
- volatile MemStoreLAB allocator;
- volatile MemStoreLAB snapshotAllocator;
- volatile long snapshotId;
- volatile boolean tagsPresent;
/**
* Default constructor. Used for tests.
@@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore {
* Constructor.
* @param c Comparator
*/
- public DefaultMemStore(final Configuration conf,
- final CellComparator c) {
- this.conf = conf;
- this.comparator = c;
- this.cellSet = new CellSkipListSet(c);
- this.snapshot = new CellSkipListSet(c);
- timeRangeTracker = new TimeRangeTracker();
- snapshotTimeRangeTracker = new TimeRangeTracker();
- this.size = new AtomicLong(DEEP_OVERHEAD);
- this.snapshotSize = 0;
- if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
- String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
- this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
- new Class[] { Configuration.class }, new Object[] { conf });
- } else {
- this.allocator = null;
- }
+ public DefaultMemStore(final Configuration conf, final CellComparator c) {
+ super(conf, c);
}
void dump() {
- for (Cell cell: this.cellSet) {
- LOG.info(cell);
- }
- for (Cell cell: this.snapshot) {
- LOG.info(cell);
- }
+ super.dump(LOG);
}
/**
* Creates a snapshot of the current memstore.
* Snapshot must be cleared by call to {@link #clearSnapshot(long)}
+ * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal
*/
@Override
- public MemStoreSnapshot snapshot() {
+ public MemStoreSnapshot snapshot(long flushOpSeqId) {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
+ if (!getSnapshot().isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
- this.snapshotSize = keySize();
- if (!this.cellSet.isEmpty()) {
- this.snapshot = this.cellSet;
- this.cellSet = new CellSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- this.snapshotAllocator = this.allocator;
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
- this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
- new Class[] { Configuration.class }, new Object[] { conf });
- }
- timeOfOldestEdit = Long.MAX_VALUE;
+ if (!getActive().isEmpty()) {
+ ImmutableSegment immutableSegment = SegmentFactory.instance().
+ createImmutableSegment(getConfiguration(), getActive());
+ setSnapshot(immutableSegment);
+ setSnapshotSize(keySize());
+ resetCellSet();
}
}
- MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
- this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
- this.tagsPresent);
- this.tagsPresent = false;
- return memStoreSnapshot;
- }
-
- /**
- * The passed snapshot was successfully persisted; it can be let go.
- * @param id Id of the snapshot to clean out.
- * @throws UnexpectedStateException
- * @see #snapshot()
- */
- @Override
- public void clearSnapshot(long id) throws UnexpectedStateException {
- MemStoreLAB tmpAllocator = null;
- if (this.snapshotId != id) {
- throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
- + id);
- }
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!this.snapshot.isEmpty()) {
- this.snapshot = new CellSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- this.snapshotSize = 0;
- this.snapshotId = -1;
- if (this.snapshotAllocator != null) {
- tmpAllocator = this.snapshotAllocator;
- this.snapshotAllocator = null;
- }
- if (tmpAllocator != null) {
- tmpAllocator.close();
- }
- }
-
- @Override
- public long getFlushableSize() {
- return this.snapshotSize > 0 ? this.snapshotSize : keySize();
- }
+ return new MemStoreSnapshot(this.snapshotId, getSnapshot());
- @Override
- public long getSnapshotSize() {
- return this.snapshotSize;
}
- /**
- * Write an update
- * @param cell
- * @return approximate size of the passed Cell.
- */
@Override
- public long add(Cell cell) {
- Cell toAdd = maybeCloneWithAllocator(cell);
- return internalAdd(toAdd);
+ protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
+ List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
+ list.add(0, getActive().getSegmentScanner(readPt));
+ list.add(1, getSnapshot().getSegmentScanner(readPt));
+ return list;
}
@Override
- public long timeOfOldestEdit() {
- return timeOfOldestEdit;
- }
-
- private boolean addToCellSet(Cell e) {
- boolean b = this.cellSet.add(e);
- // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
- // When we use ACL CP or Visibility CP which deals with Tags during
- // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
- // parse the byte[] to identify the tags length.
- if(e.getTagsLength() > 0) {
- tagsPresent = true;
- }
- setOldestEditTimeToNow();
- return b;
- }
-
- private boolean removeFromCellSet(Cell e) {
- boolean b = this.cellSet.remove(e);
- setOldestEditTimeToNow();
- return b;
- }
-
- void setOldestEditTimeToNow() {
- if (timeOfOldestEdit == Long.MAX_VALUE) {
- timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
- }
- }
-
- /**
- * Internal version of add() that doesn't clone Cells with the
- * allocator, and doesn't take the lock.
- *
- * Callers should ensure they already have the read lock taken
- */
- private long internalAdd(final Cell toAdd) {
- long s = heapSizeChange(toAdd, addToCellSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
- }
-
- private Cell maybeCloneWithAllocator(Cell cell) {
- if (allocator == null) {
- return cell;
- }
-
- int len = KeyValueUtil.length(cell);
- ByteRange alloc = allocator.allocateBytes(len);
- if (alloc == null) {
- // The allocation was too large, allocator decided
- // not to do anything with it.
- return cell;
- }
- assert alloc.getBytes() != null;
- KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
- KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
- newKv.setSequenceId(cell.getSequenceId());
- return newKv;
+ protected List<Segment> getListOfSegments() throws IOException {
+ List<Segment> list = new ArrayList<Segment>(2);
+ list.add(0, getActive());
+ list.add(1, getSnapshot());
+ return list;
}
/**
@@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public void rollback(Cell cell) {
- // If the key is in the snapshot, delete it. We should not update
- // this.size, because that tracks the size of only the memstore and
- // not the snapshot. The flush of this snapshot to disk has not
- // yet started because Store.flush() waits for all rwcc transactions to
- // commit before starting the flush to disk.
- Cell found = this.snapshot.get(cell);
- if (found != null && found.getSequenceId() == cell.getSequenceId()) {
- this.snapshot.remove(cell);
- long sz = heapSizeChange(cell, true);
- this.snapshotSize -= sz;
- }
- // If the key is in the memstore, delete it. Update this.size.
- found = this.cellSet.get(cell);
- if (found != null && found.getSequenceId() == cell.getSequenceId()) {
- removeFromCellSet(cell);
- long s = heapSizeChange(cell, true);
- this.size.addAndGet(-s);
- }
- }
-
- /**
- * Write a delete
- * @param deleteCell
- * @return approximate size of the passed key and value.
- */
- @Override
- public long delete(Cell deleteCell) {
- long s = 0;
- Cell toAdd = maybeCloneWithAllocator(deleteCell);
- s += heapSizeChange(toAdd, addToCellSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
+ rollbackInSnapshot(cell);
+ rollbackInActive(cell);
}
/**
@@ -342,604 +137,29 @@ public class DefaultMemStore implements MemStore {
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
- return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
- }
-
- /*
- * @param a
- * @param b
- * @return Return lowest of a or b or null if both a and b are null
- */
- private Cell getLowest(final Cell a, final Cell b) {
- if (a == null) {
- return b;
- }
- if (b == null) {
- return a;
- }
- return comparator.compareRows(a, b) <= 0? a: b;
+ return getLowest(
+ getNextRow(cell, getActive().getCellSet()),
+ getNextRow(cell, getSnapshot().getCellSet()));
}
- /*
- * @param key Find row that follows this one. If null, return first.
- * @param map Set to look in for a row beyond <code>row</code>.
- * @return Next row or null if none found. If one found, will be a new
- * KeyValue -- can be destroyed by subsequent calls to this method.
- */
- private Cell getNextRow(final Cell key,
- final NavigableSet<Cell> set) {
- Cell result = null;
- SortedSet<Cell> tail = key == null? set: set.tailSet(key);
- // Iterate until we fall into the next row; i.e. move off current row
- for (Cell cell: tail) {
- if (comparator.compareRows(cell, key) <= 0)
- continue;
- // Note: Not suppressing deletes or expired cells. Needs to be handled
- // by higher up functions.
- result = cell;
- break;
- }
- return result;
+ @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) {
}
/**
- * Only used by tests. TODO: Remove
- *
- * Given the specs of a column, update it, first by inserting a new record,
- * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
- * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
- * store will ensure that the insert/delete each are atomic. A scanner/reader will either
- * get the new value, or the old value and all readers will eventually only see the new
- * value after the old was removed.
- *
- * @param row
- * @param family
- * @param qualifier
- * @param newValue
- * @param now
- * @return Timestamp
+ * @return Total memory occupied by this MemStore.
*/
@Override
- public long updateColumnValue(byte[] row,
- byte[] family,
- byte[] qualifier,
- long newValue,
- long now) {
- Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
- // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
- if (!snSs.isEmpty()) {
- Cell snc = snSs.first();
- // is there a matching Cell in the snapshot?
- if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
- if (snc.getTimestamp() == now) {
- // poop,
- now += 1;
- }
- }
- }
-
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new Cell w/o knowing what's there already, but we also
- // want to take this chance to delete some cells. So two loops (sad)
-
- SortedSet<Cell> ss = cellSet.tailSet(firstCell);
- for (Cell cell : ss) {
- // if this isnt the row we are interested in, then bail:
- if (!CellUtil.matchingColumn(cell, family, qualifier)
- || !CellUtil.matchingRow(cell, firstCell)) {
- break; // rows dont match, bail.
- }
-
- // if the qualifier matches and it's a put, just RM it out of the cellSet.
- if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
- cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
- now = cell.getTimestamp();
- }
- }
-
- // create or update (upsert) a new Cell with
- // 'now' and a 0 memstoreTS == immediately visible
- List<Cell> cells = new ArrayList<Cell>(1);
- cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
- return upsert(cells, 1L);
- }
-
- /**
- * Update or insert the specified KeyValues.
- * <p>
- * For each KeyValue, insert into MemStore. This will atomically upsert the
- * value for that row/family/qualifier. If a KeyValue did already exist,
- * it will then be removed.
- * <p>
- * This is called under row lock, so Get operations will still see updates
- * atomically. Scans will only see each KeyValue update as atomic.
- *
- * @param readpoint readpoint below which we can safely remove duplicate KVs
- * @return change in memstore size
- */
- @Override
- public long upsert(Iterable<Cell> cells, long readpoint) {
- long size = 0;
- for (Cell cell : cells) {
- size += upsert(cell, readpoint);
- }
- return size;
- }
-
- /**
- * Inserts the specified KeyValue into MemStore and deletes any existing
- * versions of the same row/family/qualifier as the specified KeyValue.
- * <p>
- * First, the specified KeyValue is inserted into the Memstore.
- * <p>
- * If there are any existing KeyValues in this MemStore with the same row,
- * family, and qualifier, they are removed.
- * <p>
- * Callers must hold the read lock.
- * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells.
- * @return change in size of MemStore
- */
- private long upsert(Cell cell, long readpoint) {
- // Add the Cell to the MemStore
- // Use the internalAdd method here since we (a) already have a lock
- // and (b) cannot safely use the MSLAB here without potentially
- // hitting OOME - see TestMemStore.testUpsertMSLAB for a
- // test that triggers the pathological case if we don't avoid MSLAB
- // here.
- long addedSize = internalAdd(cell);
-
- // Get the Cells for the row/family/qualifier regardless of timestamp.
- // For this case we want to clean up any other puts
- Cell firstCell = KeyValueUtil.createFirstOnRow(
- cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
- cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- SortedSet<Cell> ss = cellSet.tailSet(firstCell);
- Iterator<Cell> it = ss.iterator();
- // Versions visible to oldest scanner.
- int versionsVisible = 0;
- while ( it.hasNext() ) {
- Cell cur = it.next();
-
- if (cell == cur) {
- // ignore the one just put in
- continue;
- }
- // check that this is the row and column we are interested in, otherwise bail
- if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
- // only remove Puts that concurrent scanners cannot possibly see
- if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
- cur.getSequenceId() <= readpoint) {
- if (versionsVisible >= 1) {
- // if we get here we have seen at least one version visible to the oldest scanner,
- // which means we can prove that no scanner will see this version
-
- // false means there was a change, so give us the size.
- long delta = heapSizeChange(cur, true);
- addedSize -= delta;
- this.size.addAndGet(-delta);
- it.remove();
- setOldestEditTimeToNow();
- } else {
- versionsVisible++;
- }
- }
- } else {
- // past the row or column, done
- break;
- }
- }
- return addedSize;
- }
-
- /**
- * @return scanner on memstore and snapshot in this order.
- */
- @Override
- public List<KeyValueScanner> getScanners(long readPt) {
- return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
- }
-
- /**
- * Check if this memstore may contain the required keys
- * @param scan scan
- * @param store holds reference to cf
- * @param oldestUnexpiredTS
- * @return False if the key definitely does not exist in this Memstore
- */
- public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
- byte[] cf = store.getFamily().getName();
- TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
- if (timeRange == null) {
- timeRange = scan.getTimeRange();
- }
- return (timeRangeTracker.includesTimeRange(timeRange) ||
- snapshotTimeRangeTracker.includesTimeRange(timeRange))
- && (Math.max(timeRangeTracker.getMaximumTimestamp(),
- snapshotTimeRangeTracker.getMaximumTimestamp()) >=
- oldestUnexpiredTS);
- }
-
- /*
- * MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore -- both current
- * map and snapshot.
- * This behaves as if it were a real scanner but does not maintain position.
- */
- protected class MemStoreScanner extends NonLazyKeyValueScanner {
- // Next row information for either cellSet or snapshot
- private Cell cellSetNextRow = null;
- private Cell snapshotNextRow = null;
-
- // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
- private Cell cellSetItRow = null;
- private Cell snapshotItRow = null;
-
- // iterator based scanning.
- private Iterator<Cell> cellSetIt;
- private Iterator<Cell> snapshotIt;
-
- // The cellSet and snapshot at the time of creating this scanner
- private CellSkipListSet cellSetAtCreation;
- private CellSkipListSet snapshotAtCreation;
-
- // the pre-calculated Cell to be returned by peek() or next()
- private Cell theNext;
-
- // The allocator and snapshot allocator at the time of creating this scanner
- volatile MemStoreLAB allocatorAtCreation;
- volatile MemStoreLAB snapshotAllocatorAtCreation;
-
- // A flag represents whether could stop skipping Cells for MVCC
- // if have encountered the next row. Only used for reversed scan
- private boolean stopSkippingCellsIfNextRow = false;
-
- private long readPoint;
-
- /*
- Some notes...
-
- So memstorescanner is fixed at creation time. this includes pointers/iterators into
- existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
- snapshot is moved. since kvset is null there is no point on reseeking on both,
- we can save us the trouble. During the snapshot->hfile transition, the memstore
- scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
- potentially do something smarter by adjusting the existing memstore scanner.
-
- But there is a greater problem here, that being once a scanner has progressed
- during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
- if a scan lasts a little while, there is a chance for new entries in kvset to
- become available but we will never see them. This needs to be handled at the
- StoreScanner level with coordination with MemStoreScanner.
-
- Currently, this problem is only partly managed: during the small amount of time
- when the StoreScanner has not yet created a new MemStoreScanner, we will miss
- the adds to kvset in the MemStoreScanner.
- */
-
- MemStoreScanner(long readPoint) {
- super();
-
- this.readPoint = readPoint;
- cellSetAtCreation = cellSet;
- snapshotAtCreation = snapshot;
- if (allocator != null) {
- this.allocatorAtCreation = allocator;
- this.allocatorAtCreation.incScannerCount();
- }
- if (snapshotAllocator != null) {
- this.snapshotAllocatorAtCreation = snapshotAllocator;
- this.snapshotAllocatorAtCreation.incScannerCount();
- }
- if (Trace.isTracing() && Trace.currentSpan() != null) {
- Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
- }
- }
-
- /**
- * Lock on 'this' must be held by caller.
- * @param it
- * @return Next Cell
- */
- private Cell getNext(Iterator<Cell> it) {
- Cell startCell = theNext;
- Cell v = null;
- try {
- while (it.hasNext()) {
- v = it.next();
- if (v.getSequenceId() <= this.readPoint) {
- return v;
- }
- if (stopSkippingCellsIfNextRow && startCell != null
- && comparator.compareRows(v, startCell) > 0) {
- return null;
- }
- }
-
- return null;
- } finally {
- if (v != null) {
- // in all cases, remember the last Cell iterated to
- if (it == snapshotIt) {
- snapshotItRow = v;
- } else {
- cellSetItRow = v;
- }
- }
- }
- }
-
- /**
- * Set the scanner at the seek key.
- * Must be called only once: there is no thread safety between the scanner
- * and the memStore.
- * @param key seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public synchronized boolean seek(Cell key) {
- if (key == null) {
- close();
- return false;
- }
- // kvset and snapshot will never be null.
- // if tailSet can't find anything, SortedSet is empty (not null).
- cellSetIt = cellSetAtCreation.tailSet(key).iterator();
- snapshotIt = snapshotAtCreation.tailSet(key).iterator();
- cellSetItRow = null;
- snapshotItRow = null;
-
- return seekInSubLists(key);
- }
-
-
- /**
- * (Re)initialize the iterators after a seek or a reseek.
- */
- private synchronized boolean seekInSubLists(Cell key){
- cellSetNextRow = getNext(cellSetIt);
- snapshotNextRow = getNext(snapshotIt);
-
- // Calculate the next value
- theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
- // has data
- return (theNext != null);
- }
-
-
- /**
- * Move forward on the sub-lists set previously by seek.
- * @param key seek value (should be non-null)
- * @return true if there is at least one KV to read, false otherwise
- */
- @Override
- public synchronized boolean reseek(Cell key) {
- /*
- See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
- This code is executed concurrently with flush and puts, without locks.
- Two points must be known when working on this code:
- 1) It's not possible to use the 'kvTail' and 'snapshot'
- variables, as they are modified during a flush.
- 2) The ideal implementation for performance would use the sub skip list
- implicitly pointed by the iterators 'kvsetIt' and
- 'snapshotIt'. Unfortunately the Java API does not offer a method to
- get it. So we remember the last keys we iterated to and restore
- the reseeked set to at least that point.
- */
- cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
- snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
- return seekInSubLists(key);
- }
-
-
- @Override
- public synchronized Cell peek() {
- //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
- return theNext;
- }
-
- @Override
- public synchronized Cell next() {
- if (theNext == null) {
- return null;
- }
-
- final Cell ret = theNext;
-
- // Advance one of the iterators
- if (theNext == cellSetNextRow) {
- cellSetNextRow = getNext(cellSetIt);
- } else {
- snapshotNextRow = getNext(snapshotIt);
- }
-
- // Calculate the next value
- theNext = getLowest(cellSetNextRow, snapshotNextRow);
-
- //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
- // getLowest() + " threadpoint=" + readpoint);
- return ret;
- }
-
- /*
- * Returns the lower of the two key values, or null if they are both null.
- * This uses comparator.compare() to compare the KeyValue using the memstore
- * comparator.
- */
- private Cell getLowest(Cell first, Cell second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare <= 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- /*
- * Returns the higher of the two cells, or null if they are both null.
- * This uses comparator.compare() to compare the Cell using the memstore
- * comparator.
- */
- private Cell getHighest(Cell first, Cell second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare > 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- public synchronized void close() {
- this.cellSetNextRow = null;
- this.snapshotNextRow = null;
-
- this.cellSetIt = null;
- this.snapshotIt = null;
-
- if (allocatorAtCreation != null) {
- this.allocatorAtCreation.decScannerCount();
- this.allocatorAtCreation = null;
- }
- if (snapshotAllocatorAtCreation != null) {
- this.snapshotAllocatorAtCreation.decScannerCount();
- this.snapshotAllocatorAtCreation = null;
- }
-
- this.cellSetItRow = null;
- this.snapshotItRow = null;
- }
-
- /**
- * MemStoreScanner returns max value as sequence id because it will
- * always have the latest data among all files.
- */
- @Override
- public long getSequenceID() {
- return Long.MAX_VALUE;
- }
-
- @Override
- public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
- return shouldSeek(scan, store, oldestUnexpiredTS);
- }
-
- /**
- * Seek scanner to the given key first. If it returns false(means
- * peek()==null) or scanner's peek row is bigger than row of given key, seek
- * the scanner to the previous row of given key
- */
- @Override
- public synchronized boolean backwardSeek(Cell key) {
- seek(key);
- if (peek() == null || comparator.compareRows(peek(), key) > 0) {
- return seekToPreviousRow(key);
- }
- return true;
- }
-
- /**
- * Separately get the KeyValue before the specified key from kvset and
- * snapshotset, and use the row of higher one as the previous row of
- * specified key, then seek to the first KeyValue of previous row
- */
- @Override
- public synchronized boolean seekToPreviousRow(Cell originalKey) {
- boolean keepSeeking = false;
- Cell key = originalKey;
- do {
- Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
- SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
- Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
- SortedSet<Cell> snapshotHead = snapshotAtCreation
- .headSet(firstKeyOnRow);
- Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
- .last();
- Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
- if (lastCellBeforeRow == null) {
- theNext = null;
- return false;
- }
- Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
- this.stopSkippingCellsIfNextRow = true;
- seek(firstKeyOnPreviousRow);
- this.stopSkippingCellsIfNextRow = false;
- if (peek() == null
- || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
- keepSeeking = true;
- key = firstKeyOnPreviousRow;
- continue;
- } else {
- keepSeeking = false;
- }
- } while (keepSeeking);
- return true;
- }
-
- @Override
- public synchronized boolean seekToLastRow() {
- Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
- .last();
- Cell second = snapshotAtCreation.isEmpty() ? null
- : snapshotAtCreation.last();
- Cell higherCell = getHighest(first, second);
- if (higherCell == null) {
- return false;
- }
- Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
- if (seek(firstCellOnLastRow)) {
- return true;
- } else {
- return seekToPreviousRow(higherCell);
- }
-
- }
- }
-
- public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
- + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
-
- public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
- (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
- /*
- * Calculate how the MemStore size has changed. Includes overhead of the
- * backing Map.
- * @param cell
- * @param notpresent True if the cell was NOT present in the set.
- * @return Size
- */
- static long heapSizeChange(final Cell cell, final boolean notpresent) {
- return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
- + CellUtil.estimatedHeapSizeOf(cell)) : 0;
- }
-
- private long keySize() {
- return heapSize() - DEEP_OVERHEAD;
+ public long size() {
+ return heapSize();
}
/**
- * Get the entire heap usage for this MemStore not including keys in the
- * snapshot.
+ * Check whether anything need to be done based on the current active set size
+ * Nothing need to be done for the DefaultMemStore
*/
@Override
- public long heapSize() {
- return size.get();
- }
-
- @Override
- public long size() {
- return heapSize();
+ protected void checkActiveSize() {
+ return;
}
/**
@@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore {
LOG.info("memstore2 estimated size=" + size);
final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
- for (int i = 0; i < seconds; i++) {
- // Thread.sleep(1000);
- }
LOG.info("Exiting.");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c65326a..5c29fb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -18,6 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
@@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
* or more StoreFiles, which stretch backwards over time.
@@ -1636,7 +1636,7 @@ public class HStore implements Store {
this.lock.readLock().unlock();
}
- LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : ""));
this.region.reportCompactionRequestStart(request.isMajor());
@@ -1990,8 +1990,6 @@ public class HStore implements Store {
}
/**
- * Used in tests. TODO: Remove
- *
* Updates the value for the given row/family/qualifier. This function will always be seen as
* atomic by other readers because it only puts a single KV to memstore. Thus no read/write
* control necessary.
@@ -2002,6 +2000,7 @@ public class HStore implements Store {
* @return memstore size delta
* @throws IOException
*/
+ @VisibleForTesting
public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue)
throws IOException {
@@ -2055,7 +2054,8 @@ public class HStore implements Store {
*/
@Override
public void prepare() {
- this.snapshot = memstore.snapshot();
+ // passing the current sequence number of the wal - to allow bookkeeping in the memstore
+ this.snapshot = memstore.snapshot(cacheFlushSeqNum);
this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getSize();
committedFiles = new ArrayList<Path>(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
new file mode 100644
index 0000000..cfcd81e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
+ * and is not needed for a {@link MutableSegment}. Specifically, the method
+ * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
+ * {@link MemStoreSnapshot} object.
+ * In addition, this class overrides methods that are not likely to be supported by an immutable
+ * segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which
+ * can be very inefficient.
+ */
+@InterfaceAudience.Private
+public abstract class ImmutableSegment extends Segment {
+
+ public ImmutableSegment(Segment segment) {
+ super(segment);
+ }
+
+ /**
+ * Removes the given cell from this segment.
+ * By default immutable store segment can not rollback
+ * It may be invoked by tests in specific cases where it is known to be supported {@link
+ * ImmutableSegmentAdapter}
+ */
+ @Override
+ public long rollback(Cell cell) {
+ return 0;
+ }
+
+ /**
+ * Returns a set of all the cells in the segment.
+ * The implementation of this method might be very inefficient for some immutable segments
+ * that do not maintain a cell set. Therefore by default this method is not supported.
+ * It may be invoked by tests in specific cases where it is known to be supported {@link
+ * ImmutableSegmentAdapter}
+ */
+ @Override
+ public CellSet getCellSet() {
+ throw new NotImplementedException("Immutable Segment does not support this operation by " +
+ "default");
+ }
+
+ /**
+ * Builds a special scanner for the MemStoreSnapshot object that may be different than the
+ * general segment scanner.
+ * @return a special scanner for the MemStoreSnapshot object
+ */
+ public abstract KeyValueScanner getKeyValueScanner();
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
new file mode 100644
index 0000000..058865a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+
+/**
+ * This segment is adapting a mutable segment making it into an immutable segment.
+ * This is used when a mutable segment is moved to being a snapshot or pushed into a compaction
+ * pipeline, that consists only of immutable segments.
+ * The compaction may generate different type of immutable segment
+ */
+@InterfaceAudience.Private
+public class ImmutableSegmentAdapter extends ImmutableSegment {
+
+ final private MutableSegment adaptee;
+
+ public ImmutableSegmentAdapter(MutableSegment segment) {
+ super(segment);
+ this.adaptee = segment;
+ }
+
+ @Override
+ public KeyValueScanner getKeyValueScanner() {
+ return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator());
+ }
+
+ @Override
+ public SegmentScanner getSegmentScanner(long readPoint) {
+ return adaptee.getSegmentScanner(readPoint);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return adaptee.isEmpty();
+ }
+
+ @Override
+ public int getCellsCount() {
+ return adaptee.getCellsCount();
+ }
+
+ @Override
+ public long add(Cell cell) {
+ return adaptee.add(cell);
+ }
+
+ @Override
+ public Cell getFirstAfter(Cell cell) {
+ return adaptee.getFirstAfter(cell);
+ }
+
+ @Override
+ public void close() {
+ adaptee.close();
+ }
+
+ @Override
+ public Cell maybeCloneWithAllocator(Cell cell) {
+ return adaptee.maybeCloneWithAllocator(cell);
+ }
+
+ @Override
+ public Segment setSize(long size) {
+ adaptee.setSize(size);
+ return this;
+ }
+
+ @Override
+ public long getSize() {
+ return adaptee.getSize();
+ }
+
+ @Override
+ public long rollback(Cell cell) {
+ return adaptee.rollback(cell);
+ }
+
+ @Override
+ public CellSet getCellSet() {
+ return adaptee.getCellSet();
+ }
+
+ @Override
+ public void dump(Log log) {
+ adaptee.dump(log);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index e9f8103..a10ccd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -17,10 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
/**
@@ -41,10 +42,19 @@ public interface MemStore extends HeapSize {
MemStoreSnapshot snapshot();
/**
+ * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+ * {@link #clearSnapshot(long)}.
+ * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed
+ * segment
+ * @return {@link MemStoreSnapshot}
+ */
+ MemStoreSnapshot snapshot(long flushOpSeqId);
+
+ /**
* Clears the current snapshot of the Memstore.
* @param id
* @throws UnexpectedStateException
- * @see #snapshot()
+ * @see #snapshot(long)
*/
void clearSnapshot(long id) throws UnexpectedStateException;
@@ -128,7 +138,7 @@ public interface MemStore extends HeapSize {
* @return scanner over the memstore. This might include scanner over the snapshot when one is
* present.
*/
- List<KeyValueScanner> getScanners(long readPt);
+ List<KeyValueScanner> getScanners(long readPt) throws IOException;
/**
* @return Total memory occupied by this MemStore.