You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/05/12 11:43:31 UTC
hbase git commit: HBASE-17887 Row-level consistency is broken for read
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 5cbc041b9 -> 72edf521c
HBASE-17887 Row-level consistency is broken for read
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72edf521
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72edf521
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72edf521
Branch: refs/heads/branch-1.3
Commit: 72edf521c1effe3afe6ce6b39aaf843b8651a4a6
Parents: 5cbc041
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Fri May 12 19:42:45 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri May 12 19:42:45 2017 +0800
----------------------------------------------------------------------
.../regionserver/ChangedReadersObserver.java | 9 +-
.../hbase/regionserver/DefaultMemStore.java | 300 ++++++++++---------
.../hadoop/hbase/regionserver/HStore.java | 9 +-
.../hadoop/hbase/regionserver/StoreScanner.java | 44 ++-
.../apache/hadoop/hbase/io/TestHeapSize.java | 3 +-
.../hbase/regionserver/TestDefaultMemStore.java | 66 ++--
.../hadoop/hbase/regionserver/TestHRegion.java | 4 +-
.../regionserver/TestMemStoreChunkPool.java | 12 +-
.../hadoop/hbase/regionserver/TestStore.java | 151 +++++++++-
.../hbase/regionserver/TestStoreScanner.java | 5 +-
.../hbase/regionserver/TestWideScanner.java | 3 +-
11 files changed, 398 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 0bc75e7..4421ac5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -30,9 +30,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public interface ChangedReadersObserver {
+
+ /**
+ * @return the read point of the current scan
+ */
+ long getReadPoint();
/**
* Notify observers.
+ * @param sfs The new files
+ * @param memStoreScanners scanner of current memstore
* @throws IOException e
*/
- void updateReaders(List<StoreFile> sfs) throws IOException;
+ void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 70f5a12..884ef29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -70,38 +70,26 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class DefaultMemStore implements MemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
+ @VisibleForTesting
static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
private static final boolean USEMSLAB_DEFAULT = true;
- static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
+ private static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private Configuration conf;
- // MemStore. Use a CellSkipListSet rather than SkipListSet because of the
- // better semantics. The Map will overwrite if passed a key it already had
- // whereas the Set will not add new Cell if key is same though value might be
- // different. Value is not important -- just make sure always same
- // reference passed.
- volatile CellSkipListSet cellSet;
-
- // Snapshot of memstore. Made for flusher.
- volatile CellSkipListSet snapshot;
-
+ @VisibleForTesting
final KeyValue.KVComparator comparator;
- // Used to track own heapSize
- final AtomicLong size;
- private volatile long snapshotSize;
-
// Used to track when to flush
- volatile long timeOfOldestEdit = Long.MAX_VALUE;
+ private volatile long timeOfOldestEdit = Long.MAX_VALUE;
- TimeRangeTracker timeRangeTracker;
- TimeRangeTracker snapshotTimeRangeTracker;
+ private volatile long snapshotId;
+ private volatile boolean tagsPresent;
- volatile MemStoreLAB allocator;
- volatile MemStoreLAB snapshotAllocator;
- volatile long snapshotId;
- volatile boolean tagsPresent;
+ @VisibleForTesting
+ volatile Section activeSection;
+ @VisibleForTesting
+ volatile Section snapshotSection;
/**
* Default constructor. Used for tests.
@@ -118,28 +106,8 @@ public class DefaultMemStore implements MemStore {
final KeyValue.KVComparator c) {
this.conf = conf;
this.comparator = c;
- this.cellSet = new CellSkipListSet(c);
- this.snapshot = new CellSkipListSet(c);
- timeRangeTracker = new TimeRangeTracker();
- snapshotTimeRangeTracker = new TimeRangeTracker();
- this.size = new AtomicLong(DEEP_OVERHEAD);
- this.snapshotSize = 0;
- if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
- String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
- this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
- new Class[] { Configuration.class }, new Object[] { conf });
- } else {
- this.allocator = null;
- }
- }
-
- void dump() {
- for (Cell cell: this.cellSet) {
- LOG.info(cell);
- }
- for (Cell cell: this.snapshot) {
- LOG.info(cell);
- }
+ this.activeSection = Section.newActiveSection(comparator, conf);
+ this.snapshotSection = Section.newSnapshotSection(comparator);
}
/**
@@ -150,31 +118,22 @@ public class DefaultMemStore implements MemStore {
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
+ if (!snapshotSection.getCellSkipListSet().isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
- this.snapshotSize = keySize();
- if (!this.cellSet.isEmpty()) {
- this.snapshot = this.cellSet;
- this.cellSet = new CellSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- this.snapshotAllocator = this.allocator;
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
- this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
- new Class[] { Configuration.class }, new Object[] { conf });
- }
+ if (!activeSection.getCellSkipListSet().isEmpty()) {
+ snapshotSection = activeSection;
+ activeSection = Section.newActiveSection(comparator, conf);
+ snapshotSection.getHeapSize().addAndGet(-DEEP_OVERHEAD);
timeOfOldestEdit = Long.MAX_VALUE;
}
}
- MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
- this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
+ MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId,
+ snapshotSection.getCellSkipListSet().size(), snapshotSection.getHeapSize().get(),
+ snapshotSection.getTimeRangeTracker(),
+ new CollectionBackedScanner(snapshotSection.getCellSkipListSet(), this.comparator),
this.tagsPresent);
this.tagsPresent = false;
return memStoreSnapshot;
@@ -188,37 +147,29 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public void clearSnapshot(long id) throws UnexpectedStateException {
- MemStoreLAB tmpAllocator = null;
if (this.snapshotId == -1) return; // already cleared
if (this.snapshotId != id) {
throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+ id);
}
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!this.snapshot.isEmpty()) {
- this.snapshot = new CellSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- this.snapshotSize = 0;
- this.snapshotId = -1;
- if (this.snapshotAllocator != null) {
- tmpAllocator = this.snapshotAllocator;
- this.snapshotAllocator = null;
- }
+ // OK. Passed in snapshot is same as current snapshot.
+ MemStoreLAB tmpAllocator = snapshotSection.getMemStoreLAB();
+ snapshotSection = Section.newSnapshotSection(comparator);
if (tmpAllocator != null) {
tmpAllocator.close();
}
+ this.snapshotId = -1;
}
@Override
public long getFlushableSize() {
- return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+ long snapshotSize = snapshotSection.getHeapSize().get();
+ return snapshotSize > 0 ? snapshotSize : keySize();
}
@Override
public long getSnapshotSize() {
- return this.snapshotSize;
+ return snapshotSection.getHeapSize().get();
}
/**
@@ -239,7 +190,7 @@ public class DefaultMemStore implements MemStore {
}
private boolean addToCellSet(Cell e) {
- boolean b = this.cellSet.add(e);
+ boolean b = this.activeSection.getCellSkipListSet().add(e);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
@@ -252,7 +203,7 @@ public class DefaultMemStore implements MemStore {
}
private boolean removeFromCellSet(Cell e) {
- boolean b = this.cellSet.remove(e);
+ boolean b = this.activeSection.getCellSkipListSet().remove(e);
setOldestEditTimeToNow();
return b;
}
@@ -281,8 +232,8 @@ public class DefaultMemStore implements MemStore {
if (!notPresent && mslabUsed) {
s += getCellLength(toAdd);
}
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
+ activeSection.getTimeRangeTracker().includeTimestamp(toAdd);
+ activeSection.getHeapSize().addAndGet(s);
return s;
}
@@ -295,12 +246,12 @@ public class DefaultMemStore implements MemStore {
}
private Cell maybeCloneWithAllocator(Cell cell) {
- if (allocator == null) {
+ if (activeSection.getMemStoreLAB() == null) {
return cell;
}
int len = getCellLength(cell);
- ByteRange alloc = allocator.allocateBytes(len);
+ ByteRange alloc = activeSection.getMemStoreLAB().allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
@@ -328,18 +279,18 @@ public class DefaultMemStore implements MemStore {
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
- Cell found = this.snapshot.get(cell);
+ Cell found = snapshotSection.getCellSkipListSet().get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
- this.snapshot.remove(cell);
+ snapshotSection.getCellSkipListSet().remove(cell);
long sz = heapSizeChange(cell, true);
- this.snapshotSize -= sz;
+ snapshotSection.getHeapSize().addAndGet(-sz);
}
// If the key is in the memstore, delete it. Update this.size.
- found = this.cellSet.get(cell);
+ found = activeSection.getCellSkipListSet().get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
removeFromCellSet(cell);
- long s = heapSizeChange(cell, true);
- this.size.addAndGet(-s);
+ long sz = heapSizeChange(found, true);
+ activeSection.getHeapSize().addAndGet(-sz);
}
}
@@ -361,7 +312,8 @@ public class DefaultMemStore implements MemStore {
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
- return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
+ return getLowest(getNextRow(cell, activeSection.getCellSkipListSet()),
+ getNextRow(cell, snapshotSection.getCellSkipListSet()));
}
/*
@@ -406,8 +358,8 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
- getRowKeyAtOrBefore(cellSet, state);
- getRowKeyAtOrBefore(snapshot, state);
+ getRowKeyAtOrBefore(activeSection.getCellSkipListSet(), state);
+ getRowKeyAtOrBefore(snapshotSection.getCellSkipListSet(), state);
}
/*
@@ -505,7 +457,7 @@ public class DefaultMemStore implements MemStore {
long now) {
Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
// Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
+ SortedSet<Cell> snSs = snapshotSection.getCellSkipListSet().tailSet(firstCell);
if (!snSs.isEmpty()) {
Cell snc = snSs.first();
// is there a matching Cell in the snapshot?
@@ -523,7 +475,7 @@ public class DefaultMemStore implements MemStore {
// so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad)
- SortedSet<Cell> ss = cellSet.tailSet(firstCell);
+ SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier)
@@ -601,7 +553,7 @@ public class DefaultMemStore implements MemStore {
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- SortedSet<Cell> ss = cellSet.tailSet(firstCell);
+ SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
Iterator<Cell> it = ss.iterator();
// versions visible to oldest scanner
int versionsVisible = 0;
@@ -624,7 +576,7 @@ public class DefaultMemStore implements MemStore {
// false means there was a change, so give us the size.
long delta = heapSizeChange(cur, true);
addedSize -= delta;
- this.size.addAndGet(-delta);
+ activeSection.getHeapSize().addAndGet(-delta);
it.remove();
setOldestEditTimeToNow();
} else {
@@ -680,7 +632,8 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) {
- return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
+ return Collections.<KeyValueScanner> singletonList(
+ new MemStoreScanner(activeSection, snapshotSection, readPt, comparator));
}
/**
@@ -691,14 +644,29 @@ public class DefaultMemStore implements MemStore {
* @return False if the key definitely does not exist in this Memstore
*/
public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
+ return shouldSeek(activeSection.getTimeRangeTracker(),
+ snapshotSection.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
+ }
+
+ /**
+ * Check if this memstore may contain the required keys
+ * @param activeTimeRangeTracker the tracker of active data
+ * @param snapshotTimeRangeTracker the tracker of snapshot data
+ * @param scan scan
+ * @param store holds reference to cf
+ * @param oldestUnexpiredTS
+ * @return False if the key definitely does not exist in this Memstore
+ */
+ private static boolean shouldSeek(TimeRangeTracker activeTimeRangeTracker,
+ TimeRangeTracker snapshotTimeRangeTracker, Scan scan, Store store, long oldestUnexpiredTS) {
byte[] cf = store.getFamily().getName();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
if (timeRange == null) {
timeRange = scan.getTimeRange();
}
- return (timeRangeTracker.includesTimeRange(timeRange) ||
+ return (activeTimeRangeTracker.includesTimeRange(timeRange) ||
snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
- (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
+ (Math.max(activeTimeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
}
/*
@@ -707,7 +675,7 @@ public class DefaultMemStore implements MemStore {
* map and snapshot.
* This behaves as if it were a real scanner but does not maintain position.
*/
- protected class MemStoreScanner extends NonLazyKeyValueScanner {
+ protected static class MemStoreScanner extends NonLazyKeyValueScanner {
// Next row information for either cellSet or snapshot
private Cell cellSetNextRow = null;
private Cell snapshotNextRow = null;
@@ -721,22 +689,18 @@ public class DefaultMemStore implements MemStore {
private Iterator<Cell> snapshotIt;
// The cellSet and snapshot at the time of creating this scanner
- private CellSkipListSet cellSetAtCreation;
- private CellSkipListSet snapshotAtCreation;
+ private final Section activeAtCreation;
+ private final Section snapshotAtCreation;
// the pre-calculated Cell to be returned by peek() or next()
private Cell theNext;
- // The allocator and snapshot allocator at the time of creating this scanner
- volatile MemStoreLAB allocatorAtCreation;
- volatile MemStoreLAB snapshotAllocatorAtCreation;
-
// A flag represents whether could stop skipping Cells for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingCellsIfNextRow = false;
- private long readPoint;
-
+ private final long readPoint;
+ private final KeyValue.KVComparator comparator;
/*
Some notes...
@@ -758,19 +722,16 @@ public class DefaultMemStore implements MemStore {
the adds to kvset in the MemStoreScanner.
*/
- MemStoreScanner(long readPoint) {
- super();
-
+ MemStoreScanner(Section activeSection, Section snapshotSection, long readPoint, final KeyValue.KVComparator c) {
this.readPoint = readPoint;
- cellSetAtCreation = cellSet;
- snapshotAtCreation = snapshot;
- if (allocator != null) {
- this.allocatorAtCreation = allocator;
- this.allocatorAtCreation.incScannerCount();
+ this.comparator = c;
+ activeAtCreation = activeSection;
+ snapshotAtCreation = snapshotSection;
+ if (activeAtCreation.getMemStoreLAB() != null) {
+ activeAtCreation.getMemStoreLAB().incScannerCount();
}
- if (snapshotAllocator != null) {
- this.snapshotAllocatorAtCreation = snapshotAllocator;
- this.snapshotAllocatorAtCreation.incScannerCount();
+ if (snapshotAtCreation.getMemStoreLAB() != null) {
+ snapshotAtCreation.getMemStoreLAB().incScannerCount();
}
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
@@ -825,8 +786,8 @@ public class DefaultMemStore implements MemStore {
}
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
- cellSetIt = cellSetAtCreation.tailSet(key).iterator();
- snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+ cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(key).iterator();
+ snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(key).iterator();
cellSetItRow = null;
snapshotItRow = null;
@@ -868,8 +829,8 @@ public class DefaultMemStore implements MemStore {
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
- cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
- snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
+ cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(getHighest(key, cellSetItRow)).iterator();
+ snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(getHighest(key, snapshotItRow)).iterator();
return seekInSubLists(key);
}
@@ -937,20 +898,19 @@ public class DefaultMemStore implements MemStore {
return (first != null ? first : second);
}
+ @Override
public synchronized void close() {
this.cellSetNextRow = null;
this.snapshotNextRow = null;
this.cellSetIt = null;
this.snapshotIt = null;
-
- if (allocatorAtCreation != null) {
- this.allocatorAtCreation.decScannerCount();
- this.allocatorAtCreation = null;
+
+ if (activeAtCreation != null && activeAtCreation.getMemStoreLAB() != null) {
+ activeAtCreation.getMemStoreLAB().decScannerCount();
}
- if (snapshotAllocatorAtCreation != null) {
- this.snapshotAllocatorAtCreation.decScannerCount();
- this.snapshotAllocatorAtCreation = null;
+ if (snapshotAtCreation != null && snapshotAtCreation.getMemStoreLAB() != null) {
+ snapshotAtCreation.getMemStoreLAB().decScannerCount();
}
this.cellSetItRow = null;
@@ -968,7 +928,8 @@ public class DefaultMemStore implements MemStore {
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
- return shouldSeek(scan, store, oldestUnexpiredTS);
+ return shouldSeek(activeAtCreation.getTimeRangeTracker(),
+ snapshotAtCreation.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
}
/**
@@ -997,9 +958,9 @@ public class DefaultMemStore implements MemStore {
do {
Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
key.getRowLength());
- SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
+ SortedSet<Cell> cellHead = activeAtCreation.getCellSkipListSet().headSet(firstKeyOnRow);
Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
- SortedSet<Cell> snapshotHead = snapshotAtCreation
+ SortedSet<Cell> snapshotHead = snapshotAtCreation.getCellSkipListSet()
.headSet(firstKeyOnRow);
Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
.last();
@@ -1027,10 +988,10 @@ public class DefaultMemStore implements MemStore {
@Override
public synchronized boolean seekToLastRow() {
- Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
- .last();
- Cell second = snapshotAtCreation.isEmpty() ? null
- : snapshotAtCreation.last();
+ Cell first = activeAtCreation.getCellSkipListSet().isEmpty() ? null
+ : activeAtCreation.getCellSkipListSet().last();
+ Cell second = snapshotAtCreation.getCellSkipListSet().isEmpty() ? null
+ : snapshotAtCreation.getCellSkipListSet().last();
Cell higherCell = getHighest(first, second);
if (higherCell == null) {
return false;
@@ -1047,10 +1008,10 @@ public class DefaultMemStore implements MemStore {
}
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
- + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
+ + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
+ (2 * ClassSize.ATOMIC_LONG) + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/*
@@ -1075,7 +1036,7 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public long heapSize() {
- return size.get();
+ return activeSection.getHeapSize().get();
}
@Override
@@ -1125,4 +1086,63 @@ public class DefaultMemStore implements MemStore {
LOG.info("Exiting.");
}
+ /**
+ * Contains the fields which are useful to MemStoreScanner.
+ */
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ static class Section {
+ /**
+ * MemStore. Use a CellSkipListSet rather than SkipListSet because of the
+ * better semantics. The Map will overwrite if passed a key it already had
+ * whereas the Set will not add new Cell if key is same though value might be
+ * different. Value is not important -- just make sure always same reference passed.
+ */
+ private final CellSkipListSet cellSet;
+ private final TimeRangeTracker tracker = new TimeRangeTracker();
+ /**
+ * Used to track own heapSize.
+ */
+ private final AtomicLong heapSize;
+ private final MemStoreLAB allocator;
+
+ static Section newSnapshotSection(final KeyValue.KVComparator c) {
+ return new Section(c, null, 0);
+ }
+
+ static Section newActiveSection(final KeyValue.KVComparator c,
+ final Configuration conf) {
+ return new Section(c, conf, DEEP_OVERHEAD);
+ }
+
+ private Section(final KeyValue.KVComparator c,
+ final Configuration conf, long initHeapSize) {
+ this.cellSet = new CellSkipListSet(c);
+ this.heapSize = new AtomicLong(initHeapSize);
+ if (conf != null && conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+ String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
+ this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
+ new Class[]{Configuration.class}, new Object[]{conf});
+ } else {
+ this.allocator = null;
+ }
+ }
+
+ CellSkipListSet getCellSkipListSet() {
+ return cellSet;
+ }
+
+ TimeRangeTracker getTimeRangeTracker() {
+ return tracker;
+ }
+
+ AtomicLong getHeapSize() {
+ return heapSize;
+ }
+
+ MemStoreLAB getMemStoreLAB() {
+ return allocator;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index e7e33d6..24ec480 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1129,7 +1129,14 @@ public class HStore implements Store {
*/
private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
- o.updateReaders(sfs);
+ List<KeyValueScanner> memStoreScanners;
+ this.lock.readLock().lock();
+ try {
+ memStoreScanners = this.memstore.getScanners(o.getReadPoint());
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ o.updateReaders(sfs, memStoreScanners);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index fba312f..c967071 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.util.CollectionUtils;
/**
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
@@ -124,9 +125,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Indicates whether there was flush during the course of the scan
private volatile boolean flushed = false;
// generally we get one file from a flush
- private List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+ private final List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+ // generally we get one memstroe scanner from a flush
+ private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
// The current list of scanners
- private List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
+ private final List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
// flush update lock
private ReentrantLock flushLock = new ReentrantLock();
@@ -434,6 +437,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public void close() {
if (this.closing) return;
this.closing = true;
+ clearAndClose(memStoreScannersAfterFlush);
// Under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@@ -741,13 +745,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return qcode;
}
+ @Override
+ public long getReadPoint() {
+ return readPt;
+ }
+
+ private static void clearAndClose(List<KeyValueScanner> scanners) {
+ for (KeyValueScanner s : scanners) {
+ s.close();
+ }
+ scanners.clear();
+ }
+
// Implementation of ChangedReadersObserver
@Override
- public void updateReaders(List<StoreFile> sfs) throws IOException {
- flushed = true;
+ public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException {
+ if (CollectionUtils.isEmpty(sfs)
+ && CollectionUtils.isEmpty(memStoreScanners)) {
+ return;
+ }
flushLock.lock();
try {
+ flushed = true;
flushedStoreFiles.addAll(sfs);
+ if (!CollectionUtils.isEmpty(memStoreScanners)) {
+ clearAndClose(memStoreScannersAfterFlush);
+ memStoreScannersAfterFlush.addAll(memStoreScanners);
+ }
} finally {
flushLock.unlock();
}
@@ -807,12 +831,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
List<KeyValueScanner> scanners = null;
+ flushLock.lock();
try {
- flushLock.lock();
- scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
- isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+ List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
+ allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
+ isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
+ allScanners.addAll(memStoreScannersAfterFlush);
+ scanners = selectScannersFrom(allScanners);
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
+ memStoreScannersAfterFlush.clear();
} finally {
flushLock.unlock();
}
@@ -822,7 +850,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
- currentScanners.remove(i);
+ currentScanners.remove(i).close();
break;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 07ca2b9..aa1febb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -303,13 +303,14 @@ public class TestHeapSize {
// DefaultMemStore Deep Overhead
actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
- expected += ClassSize.estimateBase(AtomicLong.class, false);
+ expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
+ ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 938ecfb..f11f7cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -86,8 +86,8 @@ public class TestDefaultMemStore extends TestCase {
byte [] other = Bytes.toBytes("somethingelse");
KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
this.memstore.add(samekey);
- Cell found = this.memstore.cellSet.first();
- assertEquals(1, this.memstore.cellSet.size());
+ Cell found = this.memstore.activeSection.getCellSkipListSet().first();
+ assertEquals(1, this.memstore.activeSection.getCellSkipListSet().size());
assertTrue(Bytes.toString(found.getValue()), CellUtil.matchingValue(samekey, found));
}
@@ -98,13 +98,13 @@ public class TestDefaultMemStore extends TestCase {
long sizeChangeForSecondCell = this.memstore.add(kv);
// make sure memstore size increase won't double-count MSLAB chunk size
assertEquals(DefaultMemStore.heapSizeChange(kv, true), sizeChangeForFirstCell);
- if (this.memstore.allocator != null) {
+ if (this.memstore.activeSection.getMemStoreLAB() != null) {
// make sure memstore size increased when using MSLAB
assertEquals(memstore.getCellLength(kv), sizeChangeForSecondCell);
// make sure chunk size increased even when writing the same cell, if using MSLAB
- if (this.memstore.allocator instanceof HeapMemStoreLAB) {
+ if (this.memstore.activeSection.getMemStoreLAB() instanceof HeapMemStoreLAB) {
assertEquals(2 * memstore.getCellLength(kv),
- ((HeapMemStoreLAB) this.memstore.allocator).getCurrentChunk().getNextFreeOffset());
+ ((HeapMemStoreLAB) this.memstore.activeSection.getMemStoreLAB()).getCurrentChunk().getNextFreeOffset());
}
} else {
// make sure no memstore size change w/o MSLAB
@@ -492,7 +492,7 @@ public class TestDefaultMemStore extends TestCase {
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memstore);
runSnapshot(this.memstore);
- assertEquals("History not being cleared", 0, this.memstore.snapshot.size());
+ assertEquals("History not being cleared", 0, this.memstore.snapshotSection.getCellSkipListSet().size());
}
}
@@ -513,7 +513,7 @@ public class TestDefaultMemStore extends TestCase {
m.add(key2);
assertTrue("Expected memstore to hold 3 values, actually has " +
- m.cellSet.size(), m.cellSet.size() == 3);
+ m.activeSection.getCellSkipListSet().size(), m.activeSection.getCellSkipListSet().size() == 3);
}
//////////////////////////////////////////////////////////////////////////////
@@ -587,12 +587,12 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam ,qf3, val));
//Creating a snapshot
memstore.snapshot();
- assertEquals(3, memstore.snapshot.size());
+ assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
//Adding value to "new" memstore
- assertEquals(0, memstore.cellSet.size());
+ assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam ,qf4, val));
memstore.add(new KeyValue(row, fam ,qf5, val));
- assertEquals(2, memstore.cellSet.size());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
}
//////////////////////////////////////////////////////////////////////////////
@@ -614,7 +614,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
- assertEquals(3, memstore.cellSet.size());
+ assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
memstore.delete(del2);
@@ -625,9 +625,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put2);
expected.add(put1);
- assertEquals(4, memstore.cellSet.size());
+ assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
- for(Cell cell : memstore.cellSet) {
+ for(Cell cell : memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@@ -648,7 +648,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
- assertEquals(3, memstore.cellSet.size());
+ assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
KeyValue del2 =
new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@@ -661,9 +661,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put1);
- assertEquals(4, memstore.cellSet.size());
+ assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
- for (Cell cell: memstore.cellSet) {
+ for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@@ -701,9 +701,9 @@ public class TestDefaultMemStore extends TestCase {
- assertEquals(5, memstore.cellSet.size());
+ assertEquals(5, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
- for (Cell cell: memstore.cellSet) {
+ for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@@ -717,8 +717,8 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
- assertEquals(2, memstore.cellSet.size());
- assertEquals(delete, memstore.cellSet.first());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+ assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteVersion() throws IOException {
@@ -730,8 +730,8 @@ public class TestDefaultMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
- assertEquals(2, memstore.cellSet.size());
- assertEquals(delete, memstore.cellSet.first());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+ assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteColumn() throws IOException {
// add a put to memstore
@@ -742,8 +742,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
- assertEquals(2, memstore.cellSet.size());
- assertEquals(delete, memstore.cellSet.first());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+ assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteFamily() throws IOException {
// add a put to memstore
@@ -754,8 +754,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
- assertEquals(2, memstore.cellSet.size());
- assertEquals(delete, memstore.cellSet.first());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
+ assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
////////////////////////////////////
@@ -855,7 +855,7 @@ public class TestDefaultMemStore extends TestCase {
public void testUpsertMemstoreSize() throws Exception {
Configuration conf = HBaseConfiguration.create();
memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR);
- long oldSize = memstore.size.get();
+ long oldSize = memstore.activeSection.getHeapSize().get();
List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -866,18 +866,18 @@ public class TestDefaultMemStore extends TestCase {
l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2);// readpoint is 2
- long newSize = this.memstore.size.get();
+ long newSize = this.memstore.activeSection.getHeapSize().get();
assert(newSize > oldSize);
//The kv1 should be removed.
- assert(memstore.cellSet.size() == 2);
+ assert(memstore.activeSection.getCellSkipListSet().size() == 2);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setSequenceId(1);
l.clear(); l.add(kv4);
this.memstore.upsert(l, 3);
- assertEquals(newSize, this.memstore.size.get());
+ assertEquals(newSize, this.memstore.activeSection.getHeapSize().get());
//The kv2 should be removed.
- assert(memstore.cellSet.size() == 2);
+ assert(memstore.activeSection.getCellSkipListSet().size() == 2);
//this.memstore = null;
}
@@ -1038,10 +1038,10 @@ public class TestDefaultMemStore extends TestCase {
private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
// Save off old state.
- int oldHistorySize = hmc.snapshot.size();
+ int oldHistorySize = hmc.snapshotSection.getCellSkipListSet().size();
MemStoreSnapshot snapshot = hmc.snapshot();
// Make some assertions about what just happened.
- assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size());
+ assertTrue("History size has not increased", oldHistorySize < hmc.snapshotSection.getCellSkipListSet().size());
long t = memstore.timeOfOldestEdit();
assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
hmc.clearSnapshot(snapshot.getId());
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 3c0e147..2b0a768 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -2480,10 +2480,10 @@ public class TestHRegion {
// This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis();
DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
- Cell firstCell = memstore.cellSet.first();
+ Cell firstCell = memstore.activeSection.getCellSkipListSet().first();
assertTrue(firstCell.getTimestamp() <= now);
now = firstCell.getTimestamp();
- for (Cell cell : memstore.cellSet) {
+ for (Cell cell : memstore.activeSection.getCellSkipListSet()) {
assertTrue(cell.getTimestamp() <= now);
now = cell.getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 4f8287c..ae10d05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -115,13 +115,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
- assertEquals(3, memstore.snapshot.size());
+ assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
// Adding value to "new" memstore
- assertEquals(0, memstore.cellSet.size());
+ assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val));
- assertEquals(2, memstore.cellSet.size());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@@ -152,13 +152,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
- assertEquals(3, memstore.snapshot.size());
+ assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
// Adding value to "new" memstore
- assertEquals(0, memstore.cellSet.size());
+ assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val));
- assertEquals(2, memstore.cellSet.size());
+ assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
// opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 414c663..4cebf1e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -19,24 +19,31 @@
package org.apache.hadoop.hbase.regionserver;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -57,9 +64,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -70,15 +77,16 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Assert;
@@ -162,9 +170,14 @@ public class TestStore {
init(methodName, conf, htd, hcd);
}
- @SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd) throws IOException {
+ return init(methodName, conf, htd, hcd, null);
+ }
+
+ @SuppressWarnings("deprecation")
+ private Store init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -185,8 +198,11 @@ public class TestStore {
final WALFactory wals = new WALFactory(walConf, null, methodName);
HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
info.getTable().getNamespace()), fs, conf, info, htd, null);
-
- store = new HStore(region, hcd, conf);
+ if (hook == null) {
+ store = new HStore(region, hcd, conf);
+ } else {
+ store = new MyStore(region, hcd, conf, hook);
+ }
return store;
}
@@ -552,7 +568,7 @@ public class TestStore {
this.store.snapshot();
flushStore(store, id++);
Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
- Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
+ Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
}
private void assertCheck() {
@@ -597,7 +613,7 @@ public class TestStore {
flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size());
// from the one we inserted up there, and a new one
- Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
+ Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
// how many key/values for this row are there?
Get get = new Get(row);
@@ -666,7 +682,7 @@ public class TestStore {
}
long computedSize=0;
- for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
+ for (Cell cell : ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet()) {
long kvsize = DefaultMemStore.heapSizeChange(cell, true);
//System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
computedSize += kvsize;
@@ -698,7 +714,7 @@ public class TestStore {
// then flush.
flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size());
- Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
+ Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
// now increment again:
newValue += 1;
@@ -1112,4 +1128,113 @@ public class TestStore {
//ensure that replaceStoreFiles is not called if files are not refreshed
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
-}
+
+ private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
+ Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
+ CellUtil.setSequenceId(c, sequenceId);
+ return c;
+ }
+
+ @Test
+ public void testScanWithDoubleFlush() throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ // Initialize region
+ MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
+ @Override
+ public void hook(final MyStore store) throws IOException {
+ final long tmpId = id++;
+ ExecutorService s = Executors.newSingleThreadExecutor();
+ s.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // flush the store before storescanner updates the scanners from store.
+ // The current data will be flushed into files and the memstore will
+ // be clear.
+ // -- phase (4/4)
+ flushStore(store, tmpId);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ s.shutdown();
+ try {
+ // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
+ s.awaitTermination(500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ }
+ }
+ });
+ byte[] oldValue = Bytes.toBytes("oldValue");
+ byte[] currentValue = Bytes.toBytes("currentValue");
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ // older data whihc shouldn't be "seen" by client
+ myStore.add(createCell(qf1, ts, seqId, oldValue));
+ myStore.add(createCell(qf2, ts, seqId, oldValue));
+ myStore.add(createCell(qf3, ts, seqId, oldValue));
+ long snapshotId = id++;
+ // push older data into snapshot -- phase (1/4)
+ StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
+ storeFlushCtx.prepare();
+
+ // insert current data into active -- phase (2/4)
+ myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue));
+ myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue));
+ myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue));
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+ try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
+ new Scan(new Get(row)), quals, seqId + 1)) {
+ // complete the flush -- phase (3/4)
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+
+ List<Cell> results = new ArrayList<>();
+ scanner.next(results);
+ assertEquals(3, results.size());
+ for (Cell c : results) {
+ byte[] actualValue = CellUtil.cloneValue(c);
+ assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+ + ", actual:" + Bytes.toStringBinary(actualValue),
+ Bytes.equals(actualValue, currentValue));
+ }
+ }
+
+ }
+
+ private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(5);
+ return (MyStore) init(methodName, conf, htd, hcd, hook);
+ }
+
+ private static class MyStore extends HStore {
+
+ private final MyScannerHook hook;
+
+ MyStore(final HRegion region, final HColumnDescriptor family,
+ final Configuration confParam, MyScannerHook hook) throws IOException {
+ super(region, family, confParam);
+ this.hook = hook;
+ }
+
+ @Override
+ public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+ boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
+ hook.hook(this);
+ return super.getScanners(files, cacheBlocks, isGet, usePread,
+ isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner);
+ }
+ }
+
+ private interface MyScannerHook {
+
+ void hook(MyStore store) throws IOException;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index ba4ad3c..00dd9e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
@@ -452,9 +453,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
- scan.updateReaders(new ArrayList<StoreFile>());
+ scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
- scan.updateReaders(new ArrayList<StoreFile>());
+ scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
scan.peek();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72edf521/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index 7e86632..e379e85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@@ -125,7 +126,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
- ss.updateReaders(new ArrayList<StoreFile>());
+ ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}
} while (more);