You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/15 18:43:35 UTC

[21/26] hbase git commit: HBASE-14895 Seek only to the newly flushed file on scanner reset on flush (Ram)

HBASE-14895 Seek only to the newly flushed file on scanner reset on flush
(Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/555d9b70
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/555d9b70
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/555d9b70

Branch: refs/heads/hbase-12439
Commit: 555d9b70bd650a0df0ed9e382de449c337274493
Parents: 676ce01
Author: ramkrishna <ra...@gmail.com>
Authored: Mon Dec 14 10:09:41 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Mon Dec 14 10:13:53 2015 +0530

----------------------------------------------------------------------
 .../regionserver/ChangedReadersObserver.java    |   3 +-
 .../hadoop/hbase/regionserver/HStore.java       |  30 ++++-
 .../regionserver/ReversedStoreScanner.java      |   6 +-
 .../apache/hadoop/hbase/regionserver/Store.java |  19 +++
 .../hbase/regionserver/StoreFileScanner.java    |   3 +
 .../hadoop/hbase/regionserver/StoreScanner.java | 116 +++++++++-------
 .../client/TestBlockEvictionFromClient.java     | 132 +++++++++++++++++++
 .../hbase/regionserver/TestStoreScanner.java    |   4 +-
 .../hbase/regionserver/TestWideScanner.java     |   2 +-
 9 files changed, 258 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 36b7559..0bc75e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -33,5 +34,5 @@ public interface ChangedReadersObserver {
    * Notify observers.
    * @throws IOException e
    */
-  void updateReaders() throws IOException;
+  void updateReaders(List<StoreFile> sfs) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 49b6c50..badbd65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1042,7 +1042,7 @@ public class HStore implements Store {
       this.lock.writeLock().unlock();
     }
     // notify to be called here - only in case of flushes
-    notifyChangedReadersObservers();
+    notifyChangedReadersObservers(sfs);
     if (LOG.isTraceEnabled()) {
       long totalSize = 0;
       for (StoreFile sf : sfs) {
@@ -1060,9 +1060,9 @@ public class HStore implements Store {
    * Notify all observers that set of Readers has changed.
    * @throws IOException
    */
-  private void notifyChangedReadersObservers() throws IOException {
+  private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
     for (ChangedReadersObserver o : this.changedReaderObservers) {
-      o.updateReaders();
+      o.updateReaders(sfs);
     }
   }
 
@@ -1102,6 +1102,30 @@ public class HStore implements Store {
   }
 
   @Override
+  public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+      boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+      byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
+    List<KeyValueScanner> memStoreScanners = null;
+    if (includeMemstoreScanner) {
+      this.lock.readLock().lock();
+      try {
+        memStoreScanners = this.memstore.getScanners(readPt);
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
+      cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
+    scanners.addAll(sfScanners);
+    // Then the memstore scanners
+    if (memStoreScanners != null) {
+      scanners.addAll(memStoreScanners);
+    }
+    return scanners;
+  }
+
+  @Override
   public void addChangedReaderObserver(ChangedReadersObserver o) {
     this.changedReaderObservers.add(o);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 0e1d90f..41c13f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,13 +123,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
 
   @Override
   public boolean seekToPreviousRow(Cell key) throws IOException {
-    checkReseek();
+    boolean flushed = checkFlushed();
+    checkReseek(flushed);
     return this.heap.seekToPreviousRow(key);
   }
   
   @Override
   public boolean backwardSeek(Cell key) throws IOException {
-    checkReseek();
+    boolean flushed = checkFlushed();
+    checkReseek(flushed);
     return this.heap.backwardSeek(key);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index f137a8e..8bb10f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -105,6 +105,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
     byte[] stopRow,
     long readPt
   ) throws IOException;
+
+  /**
+   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
+   * (that happens further down the line).
+   * @param files the list of files on which the scanners has to be created
+   * @param cacheBlocks cache the blocks or not
+   * @param isGet true if it is get, false if not
+   * @param usePread true to use pread, false if not
+   * @param isCompaction true if the scanner is created for compaction
+   * @param matcher the scan query matcher
+   * @param startRow the start row
+   * @param stopRow the stop row
+   * @param readPt the read point of the current scan
+   * @param includeMemstoreScanner true if memstore has to be included
+   * @return scanners on the given files and on the memstore if specified
+   */
+   List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, boolean isGet,
+          boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+          byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException;
   
   ScanInfo getScanInfo();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c864733..d752e17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -48,6 +48,7 @@ public class StoreFileScanner implements KeyValueScanner {
   private final StoreFile.Reader reader;
   private final HFileScanner hfs;
   private Cell cur = null;
+  private boolean closed = false;
 
   private boolean realSeekDone;
   private boolean delayedReseek;
@@ -246,11 +247,13 @@ public class StoreFileScanner implements KeyValueScanner {
   }
 
   public void close() {
+    if (closed) return;
     cur = null;
     this.hfs.close();
     if (this.reader != null) {
       this.reader.decrementRefCount();
     }
+    closed = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 44f07f7..987a3f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -126,6 +127,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private boolean scanUsePread = false;
   // Indicates whether there was flush during the course of the scan
   protected volatile boolean flushed = false;
+  // generally we get one file from a flush
+  protected List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
+  // The current list of scanners
+  protected List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
+  // flush update lock
+  private ReentrantLock flushLock = new ReentrantLock();
   
   protected final long readPt;
 
@@ -170,6 +177,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
      }
   }
 
+  protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
+    this.currentScanners.addAll(scanners);
+  }
   /**
    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
    * are not in a compaction.
@@ -207,7 +217,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
     // set rowOffset
     this.storeOffset = scan.getRowOffsetPerColumnFamily();
-
+    addCurrentScanners(scanners);
     // Combine all seeked scanners with a heap
     resetKVHeap(scanners, store.getComparator());
   }
@@ -264,7 +274,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
     // Seek all scanners to the initial key
     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
-
+    addCurrentScanners(scanners);
     // Combine all seeked scanners with a heap
     resetKVHeap(scanners, store.getComparator());
   }
@@ -303,6 +313,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     // Seek all scanners to the initial key
     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
+    addCurrentScanners(scanners);
     resetKVHeap(scanners, scanInfo.getComparator());
   }
 
@@ -403,7 +414,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public Cell peek() {
-    checkResetHeap();
+    checkFlushed();
     if (this.heap == null) {
       return this.lastTop;
     }
@@ -435,11 +446,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       this.heapsForDelayedClose.clear();
       if (this.heap != null) {
         this.heap.close();
+        this.currentScanners.clear();
         this.heap = null; // CLOSED!
       }
     } else {
       if (this.heap != null) {
         this.heapsForDelayedClose.add(this.heap);
+        this.currentScanners.clear();
         this.heap = null;
       }
     }
@@ -448,9 +461,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public boolean seek(Cell key) throws IOException {
-    checkResetHeap();
+    boolean flushed = checkFlushed();
     // reset matcher state, in case that underlying store changed
-    checkReseek();
+    checkReseek(flushed);
     return this.heap.seek(key);
   }
 
@@ -470,8 +483,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (scannerContext == null) {
       throw new IllegalArgumentException("Scanner context cannot be null");
     }
-    checkResetHeap();
-    if (checkReseek()) {
+    boolean flushed = checkFlushed();
+    if (checkReseek(flushed)) {
       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
 
@@ -665,36 +678,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   // Implementation of ChangedReadersObserver
   @Override
-  public void updateReaders() throws IOException {
+  public void updateReaders(List<StoreFile> sfs) throws IOException {
     flushed = true;
+    flushLock.lock();
+    try {
+      flushedStoreFiles.addAll(sfs);
+    } finally {
+      flushLock.unlock();
+    }
     // Let the next() call handle re-creating and seeking
   }
 
-  protected void nullifyCurrentHeap() {
-    if (this.closing) return;
-    // All public synchronized API calls will call 'checkReseek' which will cause
-    // the scanner stack to reseek if this.heap==null && this.lastTop != null.
-    // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
-    // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
-    // which is NOT what we want, not to mention could cause an NPE. So we early out here.
-    if (this.heap == null) return;
-    // this could be null.
-    this.lastTop = this.heap.peek();
-
-    //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
-
-    // close scanners to old obsolete Store files
-    this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
-    this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
-  }
-
   /**
+   * @param flushed indicates if there was a flush
    * @return true if top of heap has changed (and KeyValueHeap has to try the
    *         next KV)
    * @throws IOException
    */
-  protected boolean checkReseek() throws IOException {
-    if (this.heap == null && this.lastTop != null) {
+  protected boolean checkReseek(boolean flushed) throws IOException {
+    if (flushed && this.lastTop != null) {
       resetScannerStack(this.lastTop);
       if (this.heap.peek() == null
           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
@@ -710,21 +712,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   protected void resetScannerStack(Cell lastTopKey) throws IOException {
-    if (heap != null) {
-      throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
-    }
-
     /* When we have the scan object, should we not pass it to getScanners()
      * to get a limited set of scanners? We did so in the constructor and we
-     * could have done it now by storing the scan object from the constructor */
-    List<KeyValueScanner> scanners = getScannersNoCompaction();
+     * could have done it now by storing the scan object from the constructor
+     */
 
-    // Seek all scanners to the initial key
-    seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+    final boolean isCompaction = false;
+    boolean usePread = get || scanUsePread;
+    List<KeyValueScanner> scanners = null;
+    try {
+      flushLock.lock();
+      scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
+        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+      // Clear the current set of flushed store files so that they don't get added again
+      flushedStoreFiles.clear();
+    } finally {
+      flushLock.unlock();
+    }
 
+    // Seek the new scanners to the last key
+    seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+    // remove the older memstore scanner
+    for (int i = 0; i < currentScanners.size(); i++) {
+      if (!currentScanners.get(i).isFileScanner()) {
+        currentScanners.remove(i);
+        break;
+      }
+    }
+    // add the newly created scanners on the flushed files and the current active memstore scanner
+    addCurrentScanners(scanners);
     // Combine all seeked scanners with a heap
-    resetKVHeap(scanners, store.getComparator());
-
+    resetKVHeap(this.currentScanners, store.getComparator());
     // Reset the state of the Query Matcher and set to top row.
     // Only reset and call setRow if the row changes; avoids confusing the
     // query matcher if scanning intra-row.
@@ -771,34 +789,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public boolean reseek(Cell kv) throws IOException {
-    checkResetHeap();
+    boolean flushed = checkFlushed();
     // Heap will not be null, if this is called from next() which.
     // If called from RegionScanner.reseek(...) make sure the scanner
     // stack is reset if needed.
-    checkReseek();
+    checkReseek(flushed);
     if (explicitColumnQuery && lazySeekEnabledGlobally) {
       return heap.requestSeek(kv, true, useRowColBloom);
     }
     return heap.reseek(kv);
   }
 
-  protected void checkResetHeap() {
+  protected boolean checkFlushed() {
     // check the var without any lock. Suppose even if we see the old
     // value here still it is ok to continue because we will not be resetting
     // the heap but will continue with the referenced memstore's snapshot. For compactions
     // any way we don't need the updateReaders at all to happen as we still continue with 
     // the older files
     if (flushed) {
-      // If the 'flushed' is found to be true then there is a need to ensure
-      // that the current scanner updates the heap that it has and then proceed
-      // with the scan and ensure to reset the flushed inside the lock
-      // One thing can be sure that the same store scanner cannot be in reseek and
-      // next at the same time ie. within the same store scanner it is always single
-      // threaded
-      nullifyCurrentHeap();
+      // If there is a flush and the current scan is notified on the flush ensure that the 
+      // scan's heap gets reset and we do a seek on the newly flushed file.
+      if(!this.closing) {
+        this.lastTop = this.heap.peek();
+      } else {
+        return false;
+      }
       // reset the flag
       flushed = false;
+      return true;
     }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index 6dedee2..a812623 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -921,6 +921,138 @@ public class TestBlockEvictionFromClient {
   }
 
   @Test
+  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
+      throws IOException, InterruptedException {
+    // do flush and scan in parallel
+    HTable table = null;
+    try {
+      latch = new CountDownLatch(1);
+      compactionLatch = new CountDownLatch(1);
+      TableName tableName =
+          TableName.valueOf("testBlockEvictionAfterHBASE13082WithCompactionAndFlush");
+      // Create a table with block size as 1024
+      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
+          CustomInnerRegionObserverWrapper.class.getName());
+      // get the block cache and region
+      RegionLocator locator = table.getRegionLocator();
+      String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+      Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
+          regionName);
+      Store store = region.getStores().iterator().next();
+      CacheConfig cacheConf = store.getCacheConfig();
+      cacheConf.setCacheDataOnWrite(true);
+      cacheConf.setEvictOnClose(true);
+      BlockCache cache = cacheConf.getBlockCache();
+
+      // insert data. 2 Rows are added
+      Put put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
+      // Should create one Hfile with 2 blocks
+      region.flush(true);
+      // read the data and expect same blocks, one new hit, no misses
+      int refCount = 0;
+      // Check how this miss is happening
+      // insert a second column, read the row, no new blocks, 3 new hits
+      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
+      byte[] data2 = Bytes.add(data, data);
+      put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER2, data2);
+      table.put(put);
+      // flush, one new block
+      System.out.println("Flushing cache");
+      region.flush(true);
+      Iterator<CachedBlock> iterator = cache.iterator();
+      iterateBlockCache(cache, iterator);
+      // Create three sets of scan
+      ScanThread[] scanThreads = initiateScan(table, false);
+      Thread.sleep(100);
+      iterator = cache.iterator();
+      boolean usedBlocksFound = false;
+      while (iterator.hasNext()) {
+        CachedBlock next = iterator.next();
+        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
+        if (cache instanceof BucketCache) {
+          refCount = ((BucketCache) cache).getRefCount(cacheKey);
+        } else if (cache instanceof CombinedBlockCache) {
+          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+        } else {
+          continue;
+        }
+        if (refCount != 0) {
+          // Blocks will be with count 3
+          assertEquals(NO_OF_THREADS, refCount);
+          usedBlocksFound = true;
+        }
+      }
+      // Make a put and do a flush
+      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
+      data2 = Bytes.add(data, data);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER2, data2);
+      table.put(put);
+      // flush, one new block
+      System.out.println("Flushing cache");
+      region.flush(true);
+      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
+      usedBlocksFound = false;
+      System.out.println("Compacting");
+      assertEquals(3, store.getStorefilesCount());
+      store.triggerMajorCompaction();
+      region.compact(true);
+      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
+      assertEquals(1, store.getStorefilesCount());
+      // Even after compaction is done we will have some blocks that cannot
+      // be evicted this is because the scan is still referencing them
+      iterator = cache.iterator();
+      while (iterator.hasNext()) {
+        CachedBlock next = iterator.next();
+        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
+        if (cache instanceof BucketCache) {
+          refCount = ((BucketCache) cache).getRefCount(cacheKey);
+        } else if (cache instanceof CombinedBlockCache) {
+          refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
+        } else {
+          continue;
+        }
+        if (refCount != 0) {
+          // Blocks will be with count 3 as they are not yet cleared
+          assertEquals(NO_OF_THREADS, refCount);
+          usedBlocksFound = true;
+        }
+      }
+      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
+      // Should not throw exception
+      compactionLatch.countDown();
+      latch.countDown();
+      for (ScanThread thread : scanThreads) {
+        thread.join();
+      }
+      // by this time all blocks should have been evicted
+      iterator = cache.iterator();
+      // Since a flush and compaction happened after a scan started
+      // we need to ensure that all the original blocks of the compacted file
+      // is also removed.
+      iterateBlockCache(cache, iterator);
+      Result r = table.get(new Get(ROW));
+      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
+      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
+      // The gets would be working on new blocks
+      iterator = cache.iterator();
+      iterateBlockCache(cache, iterator);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+
+  @Test
   public void testScanWithException() throws IOException, InterruptedException {
     HTable table = null;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 5b7e9cc..728029f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -454,9 +454,9 @@ public class TestStoreScanner extends TestCase {
     // normally cause an NPE because scan.store is null.  So as long as we get through these
     // two calls we are good and the bug was quashed.
 
-    scan.updateReaders();
+    scan.updateReaders(new ArrayList<StoreFile>());
 
-    scan.updateReaders();
+    scan.updateReaders(new ArrayList<StoreFile>());
 
     scan.peek();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/555d9b70/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index ca7b3b1..f598a8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase {
           ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
         while (scanners.hasNext()) {
           StoreScanner ss = (StoreScanner)scanners.next();
-          ss.updateReaders();
+          ss.updateReaders(new ArrayList<StoreFile>());
         }
       } while (more);