You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/15 18:43:35 UTC
[21/26] hbase git commit: HBASE-14895 Seek only to the newly flushed
file on scanner reset on flush (Ram)
HBASE-14895 Seek only to the newly flushed file on scanner reset on flush
(Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/555d9b70
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/555d9b70
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/555d9b70
Branch: refs/heads/hbase-12439
Commit: 555d9b70bd650a0df0ed9e382de449c337274493
Parents: 676ce01
Author: ramkrishna <ra...@gmail.com>
Authored: Mon Dec 14 10:09:41 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Mon Dec 14 10:13:53 2015 +0530
----------------------------------------------------------------------
.../regionserver/ChangedReadersObserver.java | 3 +-
.../hadoop/hbase/regionserver/HStore.java | 30 ++++-
.../regionserver/ReversedStoreScanner.java | 6 +-
.../apache/hadoop/hbase/regionserver/Store.java | 19 +++
.../hbase/regionserver/StoreFileScanner.java | 3 +
.../hadoop/hbase/regionserver/StoreScanner.java | 116 +++++++++-------
.../client/TestBlockEvictionFromClient.java | 132 +++++++++++++++++++
.../hbase/regionserver/TestStoreScanner.java | 4 +-
.../hbase/regionserver/TestWideScanner.java | 2 +-
9 files changed, 258 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 36b7559..0bc75e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -33,5 +34,5 @@ public interface ChangedReadersObserver {
* Notify observers.
* @throws IOException e
*/
- void updateReaders() throws IOException;
+ void updateReaders(List<StoreFile> sfs) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 49b6c50..badbd65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1042,7 +1042,7 @@ public class HStore implements Store {
this.lock.writeLock().unlock();
}
// notify to be called here - only in case of flushes
- notifyChangedReadersObservers();
+ notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
long totalSize = 0;
for (StoreFile sf : sfs) {
@@ -1060,9 +1060,9 @@ public class HStore implements Store {
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
- private void notifyChangedReadersObservers() throws IOException {
+ private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
- o.updateReaders();
+ o.updateReaders(sfs);
}
}
@@ -1102,6 +1102,30 @@ public class HStore implements Store {
}
@Override
+ public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+ boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
+ List<KeyValueScanner> memStoreScanners = null;
+ if (includeMemstoreScanner) {
+ this.lock.readLock().lock();
+ try {
+ memStoreScanners = this.memstore.getScanners(readPt);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+ List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
+ scanners.addAll(sfScanners);
+ // Then the memstore scanners
+ if (memStoreScanners != null) {
+ scanners.addAll(memStoreScanners);
+ }
+ return scanners;
+ }
+
+ @Override
public void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 0e1d90f..41c13f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,13 +123,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
- checkReseek();
+ boolean flushed = checkFlushed();
+ checkReseek(flushed);
return this.heap.seekToPreviousRow(key);
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
- checkReseek();
+ boolean flushed = checkFlushed();
+ checkReseek(flushed);
return this.heap.backwardSeek(key);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index f137a8e..8bb10f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -105,6 +105,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
byte[] stopRow,
long readPt
) throws IOException;
+
+ /**
+ * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
+ * (that happens further down the line).
+ * @param files the list of files on which the scanners has to be created
+ * @param cacheBlocks cache the blocks or not
+ * @param isGet true if it is get, false if not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param stopRow the stop row
+ * @param readPt the read point of the current scan
+ * @param includeMemstoreScanner true if memstore has to be included
+ * @return scanners on the given files and on the memstore if specified
+ */
+ List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, boolean isGet,
+ boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+ byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException;
ScanInfo getScanInfo();
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c864733..d752e17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -48,6 +48,7 @@ public class StoreFileScanner implements KeyValueScanner {
private final StoreFile.Reader reader;
private final HFileScanner hfs;
private Cell cur = null;
+ private boolean closed = false;
private boolean realSeekDone;
private boolean delayedReseek;
@@ -246,11 +247,13 @@ public class StoreFileScanner implements KeyValueScanner {
}
public void close() {
+ if (closed) return;
cur = null;
this.hfs.close();
if (this.reader != null) {
this.reader.decrementRefCount();
}
+ closed = true;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 44f07f7..987a3f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -126,6 +127,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private boolean scanUsePread = false;
// Indicates whether there was flush during the course of the scan
protected volatile boolean flushed = false;
+ // generally we get one file from a flush
+ protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+ // The current list of scanners
+ protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
+ // flush update lock
+ private ReentrantLock flushLock = new ReentrantLock();
protected final long readPt;
@@ -170,6 +177,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
}
+ protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
+ this.currentScanners.addAll(scanners);
+ }
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
* are not in a compaction.
@@ -207,7 +217,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily();
-
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@@ -264,7 +274,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
-
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@@ -303,6 +313,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
+ addCurrentScanners(scanners);
resetKVHeap(scanners, scanInfo.getComparator());
}
@@ -403,7 +414,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
- checkResetHeap();
+ checkFlushed();
if (this.heap == null) {
return this.lastTop;
}
@@ -435,11 +446,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heapsForDelayedClose.clear();
if (this.heap != null) {
this.heap.close();
+ this.currentScanners.clear();
this.heap = null; // CLOSED!
}
} else {
if (this.heap != null) {
this.heapsForDelayedClose.add(this.heap);
+ this.currentScanners.clear();
this.heap = null;
}
}
@@ -448,9 +461,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean seek(Cell key) throws IOException {
- checkResetHeap();
+ boolean flushed = checkFlushed();
// reset matcher state, in case that underlying store changed
- checkReseek();
+ checkReseek(flushed);
return this.heap.seek(key);
}
@@ -470,8 +483,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
- checkResetHeap();
- if (checkReseek()) {
+ boolean flushed = checkFlushed();
+ if (checkReseek(flushed)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
@@ -665,36 +678,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Implementation of ChangedReadersObserver
@Override
- public void updateReaders() throws IOException {
+ public void updateReaders(List<StoreFile> sfs) throws IOException {
flushed = true;
+ flushLock.lock();
+ try {
+ flushedStoreFiles.addAll(sfs);
+ } finally {
+ flushLock.unlock();
+ }
// Let the next() call handle re-creating and seeking
}
- protected void nullifyCurrentHeap() {
- if (this.closing) return;
- // All public synchronized API calls will call 'checkReseek' which will cause
- // the scanner stack to reseek if this.heap==null && this.lastTop != null.
- // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
- // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
- // which is NOT what we want, not to mention could cause an NPE. So we early out here.
- if (this.heap == null) return;
- // this could be null.
- this.lastTop = this.heap.peek();
-
- //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
-
- // close scanners to old obsolete Store files
- this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
- this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
- }
-
/**
+ * @param flushed indicates if there was a flush
* @return true if top of heap has changed (and KeyValueHeap has to try the
* next KV)
* @throws IOException
*/
- protected boolean checkReseek() throws IOException {
- if (this.heap == null && this.lastTop != null) {
+ protected boolean checkReseek(boolean flushed) throws IOException {
+ if (flushed && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
@@ -710,21 +712,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
protected void resetScannerStack(Cell lastTopKey) throws IOException {
- if (heap != null) {
- throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
- }
-
/* When we have the scan object, should we not pass it to getScanners()
* to get a limited set of scanners? We did so in the constructor and we
- * could have done it now by storing the scan object from the constructor */
- List<KeyValueScanner> scanners = getScannersNoCompaction();
+ * could have done it now by storing the scan object from the constructor
+ */
- // Seek all scanners to the initial key
- seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+ final boolean isCompaction = false;
+ boolean usePread = get || scanUsePread;
+ List<KeyValueScanner> scanners = null;
+ try {
+ flushLock.lock();
+ scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
+ isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+ // Clear the current set of flushed store files so that they don't get added again
+ flushedStoreFiles.clear();
+ } finally {
+ flushLock.unlock();
+ }
+ // Seek the new scanners to the last key
+ seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+ // remove the older memstore scanner
+ for (int i = 0; i < currentScanners.size(); i++) {
+ if (!currentScanners.get(i).isFileScanner()) {
+ currentScanners.remove(i);
+ break;
+ }
+ }
+ // add the newly created scanners on the flushed files and the current active memstore scanner
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
- resetKVHeap(scanners, store.getComparator());
-
+ resetKVHeap(this.currentScanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
@@ -771,34 +789,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
- checkResetHeap();
+ boolean flushed = checkFlushed();
// Heap will not be null, if this is called from next() which.
// If called from RegionScanner.reseek(...) make sure the scanner
// stack is reset if needed.
- checkReseek();
+ checkReseek(flushed);
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
}
- protected void checkResetHeap() {
+ protected boolean checkFlushed() {
// check the var without any lock. Suppose even if we see the old
// value here still it is ok to continue because we will not be resetting
// the heap but will continue with the referenced memstore's snapshot. For compactions
// any way we don't need the updateReaders at all to happen as we still continue with
// the older files
if (flushed) {
- // If the 'flushed' is found to be true then there is a need to ensure
- // that the current scanner updates the heap that it has and then proceed
- // with the scan and ensure to reset the flushed inside the lock
- // One thing can be sure that the same store scanner cannot be in reseek and
- // next at the same time ie. within the same store scanner it is always single
- // threaded
- nullifyCurrentHeap();
+ // If there is a flush and the current scan is notified on the flush ensure that the
+ // scan's heap gets reset and we do a seek on the newly flushed file.
+ if(!this.closing) {
+ this.lastTop = this.heap.peek();
+ } else {
+ return false;
+ }
// reset the flag
flushed = false;
+ return true;
}
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index 6dedee2..a812623 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -921,6 +921,138 @@ public class TestBlockEvictionFromClient {
}
@Test
+ public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
+ throws IOException, InterruptedException {
+ // do flush and scan in parallel
+ HTable table = null;
+ try {
+ latch = new CountDownLatch(1);
+ compactionLatch = new CountDownLatch(1);
+ TableName tableName =
+ TableName.valueOf("testBlockEvictionAfterHBASE13082WithCompactionAndFlush");
+ // Create a table with block size as 1024
+ table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
+ CustomInnerRegionObserverWrapper.class.getName());
+ // get the block cache and region
+ RegionLocator locator = table.getRegionLocator();
+ String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+ Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
+ regionName);
+ Store store = region.getStores().iterator().next();
+ CacheConfig cacheConf = store.getCacheConfig();
+ cacheConf.setCacheDataOnWrite(true);
+ cacheConf.setEvictOnClose(true);
+ BlockCache cache = cacheConf.getBlockCache();
+
+ // insert data. 2 Rows are added
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, data);
+ table.put(put);
+ put = new Put(ROW1);
+ put.addColumn(FAMILY, QUALIFIER, data);
+ table.put(put);
+ assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
+ // Should create one Hfile with 2 blocks
+ region.flush(true);
+ // read the data and expect same blocks, one new hit, no misses
+ int refCount = 0;
+ // Check how this miss is happening
+ // insert a second column, read the row, no new blocks, 3 new hits
+ byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
+ byte[] data2 = Bytes.add(data, data);
+ put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER2, data2);
+ table.put(put);
+ // flush, one new block
+ System.out.println("Flushing cache");
+ region.flush(true);
+ Iterator<CachedBlock> iterator = cache.iterator();
+ iterateBlockCache(cache, iterator);
+ // Create three sets of scan
+ ScanThread[] scanThreads = initiateScan(table, false);
+ Thread.sleep(100);
+ iterator = cache.iterator();
+ boolean usedBlocksFound = false;
+ while (iterator.hasNext()) {
+ CachedBlock next = iterator.next();
+ BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
+ if (cache instanceof BucketCache) {
+ refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ } else if (cache instanceof CombinedBlockCache) {
+ refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ } else {
+ continue;
+ }
+ if (refCount != 0) {
+ // Blocks will be with count 3
+ assertEquals(NO_OF_THREADS, refCount);
+ usedBlocksFound = true;
+ }
+ }
+ // Make a put and do a flush
+ QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
+ data2 = Bytes.add(data, data);
+ put = new Put(ROW1);
+ put.addColumn(FAMILY, QUALIFIER2, data2);
+ table.put(put);
+ // flush, one new block
+ System.out.println("Flushing cache");
+ region.flush(true);
+ assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
+ usedBlocksFound = false;
+ System.out.println("Compacting");
+ assertEquals(3, store.getStorefilesCount());
+ store.triggerMajorCompaction();
+ region.compact(true);
+ waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
+ assertEquals(1, store.getStorefilesCount());
+ // Even after compaction is done we will have some blocks that cannot
+ // be evicted this is because the scan is still referencing them
+ iterator = cache.iterator();
+ while (iterator.hasNext()) {
+ CachedBlock next = iterator.next();
+ BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
+ if (cache instanceof BucketCache) {
+ refCount = ((BucketCache) cache).getRefCount(cacheKey);
+ } else if (cache instanceof CombinedBlockCache) {
+ refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+ } else {
+ continue;
+ }
+ if (refCount != 0) {
+ // Blocks will be with count 3 as they are not yet cleared
+ assertEquals(NO_OF_THREADS, refCount);
+ usedBlocksFound = true;
+ }
+ }
+ assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
+ // Should not throw exception
+ compactionLatch.countDown();
+ latch.countDown();
+ for (ScanThread thread : scanThreads) {
+ thread.join();
+ }
+ // by this time all blocks should have been evicted
+ iterator = cache.iterator();
+ // Since a flush and compaction happened after a scan started
+ // we need to ensure that all the original blocks of the compacted file
+ // is also removed.
+ iterateBlockCache(cache, iterator);
+ Result r = table.get(new Get(ROW));
+ assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
+ assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
+ // The gets would be working on new blocks
+ iterator = cache.iterator();
+ iterateBlockCache(cache, iterator);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+
+ @Test
public void testScanWithException() throws IOException, InterruptedException {
HTable table = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 5b7e9cc..728029f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -454,9 +454,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
- scan.updateReaders();
+ scan.updateReaders(new ArrayList<StoreFile>());
- scan.updateReaders();
+ scan.updateReaders(new ArrayList<StoreFile>());
scan.peek();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index ca7b3b1..f598a8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
- ss.updateReaders();
+ ss.updateReaders(new ArrayList<StoreFile>());
}
} while (more);