You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/04/19 01:48:23 UTC

[1/3] hbase git commit: HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction

Repository: hbase
Updated Branches:
  refs/heads/master 719a30b11 -> 66b616d7a


http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index f2d00b3..b839fc3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -753,9 +753,6 @@ public class TestStripeCompactionPolicy {
     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
       anyBoolean())).thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
-    when(sf.createReader(anyBoolean())).thenReturn(r);
-    when(sf.createReader()).thenReturn(r);
-    when(sf.cloneForReader()).thenReturn(sf);
     return sf;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 54f310d..17ab004 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -153,7 +153,7 @@ public class TestHBaseFsckEncryption {
 
   private byte[] extractHFileKey(Path path) throws Exception {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
-      new CacheConfig(conf), conf);
+      new CacheConfig(conf), true, conf);
     try {
       reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index 795ce6d..d2b707e 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -390,7 +390,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
     for ( i <- 0 until f1FileList.length) {
       val reader = HFile.createReader(fs, f1FileList(i).getPath,
-        new CacheConfig(config), config)
+        new CacheConfig(config), true, config)
       assert(reader.getCompressionAlgorithm.getName.equals("gz"))
       assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
     }
@@ -400,7 +400,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
     for ( i <- 0 until f2FileList.length) {
       val reader = HFile.createReader(fs, f2FileList(i).getPath,
-        new CacheConfig(config), config)
+        new CacheConfig(config), true, config)
       assert(reader.getCompressionAlgorithm.getName.equals("none"))
       assert(reader.getDataBlockEncoding.name().equals("NONE"))
     }
@@ -869,7 +869,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
     for ( i <- 0 until f1FileList.length) {
       val reader = HFile.createReader(fs, f1FileList(i).getPath,
-        new CacheConfig(config), config)
+        new CacheConfig(config), true, config)
       assert(reader.getCompressionAlgorithm.getName.equals("gz"))
       assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
     }
@@ -879,7 +879,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
     for ( i <- 0 until f2FileList.length) {
       val reader = HFile.createReader(fs, f2FileList(i).getPath,
-        new CacheConfig(config), config)
+        new CacheConfig(config), true, config)
       assert(reader.getCompressionAlgorithm.getName.equals("none"))
       assert(reader.getDataBlockEncoding.name().equals("NONE"))
     }


[2/3] hbase git commit: HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index d72529a..0ba500a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import com.google.common.io.Closeables;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -59,8 +60,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
-import com.google.common.io.Closeables;
-
 /**
  * A compactor is a compaction algorithm associated a given policy. Base class also contains
  * reusable parts for implementing compactors (what is common and what isn't is evolving).
@@ -216,15 +215,9 @@ public abstract class Compactor<T extends CellSink> {
    * @param filesToCompact Files.
    * @return Scanners.
    */
-  protected List<StoreFileScanner> createFileScanners(
-      final Collection<StoreFile> filesToCompact,
-      long smallestReadPoint,
-      boolean useDropBehind) throws IOException {
-    return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
-        /* cache blocks = */ false,
-        /* use pread = */ false,
-        /* is compaction */ true,
-        /* use Drop Behind */ useDropBehind,
+  protected List<StoreFileScanner> createFileScanners(Collection<StoreFile> filesToCompact,
+      long smallestReadPoint, boolean useDropBehind) throws IOException {
+    return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
       smallestReadPoint);
   }
 
@@ -281,8 +274,6 @@ public abstract class Compactor<T extends CellSink> {
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = getSmallestReadPoint();
 
-    List<StoreFileScanner> scanners;
-    Collection<StoreFile> readersToClose;
     T writer = null;
     boolean dropCache;
     if (request.isMajor() || request.isAllFiles()) {
@@ -291,22 +282,8 @@ public abstract class Compactor<T extends CellSink> {
       dropCache = this.dropCacheMinor;
     }
 
-    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
-      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
-      // HFiles, and their readers
-      readersToClose = new ArrayList<>(request.getFiles().size());
-      for (StoreFile f : request.getFiles()) {
-        StoreFile clonedStoreFile = f.cloneForReader();
-        // create the reader after the store file is cloned in case
-        // the sequence id is used for sorting in scanners
-        clonedStoreFile.createReader();
-        readersToClose.add(clonedStoreFile);
-      }
-      scanners = createFileScanners(readersToClose, smallestReadPoint, dropCache);
-    } else {
-      readersToClose = Collections.emptyList();
-      scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
-    }
+    List<StoreFileScanner> scanners =
+        createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
     InternalScanner scanner = null;
     boolean finished = false;
     try {
@@ -336,13 +313,6 @@ public abstract class Compactor<T extends CellSink> {
       }
     } finally {
       Closeables.close(scanner, true);
-      for (StoreFile f : readersToClose) {
-        try {
-          f.closeReader(true);
-        } catch (IOException e) {
-          LOG.warn("Exception closing " + f, e);
-        }
-      }
       if (!finished && writer != null) {
         abortWriter(writer);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index ace45ec..7b745ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -133,7 +133,7 @@ public class CompressionTest {
     writer.appendFileInfo(Bytes.toBytes("compressioninfokey"), Bytes.toBytes("compressioninfoval"));
     writer.close();
     Cell cc = null;
-    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
     try {
       reader.loadFileInfo();
       HFileScanner scanner = reader.getScanner(false, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 4eab62b..dca02e4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -849,8 +849,8 @@ public class HBaseFsck extends Configured implements Closeable {
             FileStatus[] storeFiles = fs.listStatus(file.getPath());
             // For all the stores in this column family.
             for (FileStatus storeFile : storeFiles) {
-              HFile.Reader reader = HFile.createReader(fs, storeFile.getPath(), new CacheConfig(
-                  getConf()), getConf());
+              HFile.Reader reader = HFile.createReader(fs, storeFile.getPath(),
+                new CacheConfig(getConf()), true, getConf());
               if ((reader.getFirstKey() != null)
                   && ((storeFirstKey == null) || (comparator.compare(storeFirstKey,
                       ((KeyValue.KeyOnlyKeyValue) reader.getFirstKey()).getKey()) > 0))) {
@@ -954,7 +954,7 @@ public class HBaseFsck extends Configured implements Closeable {
         HFile.Reader hf = null;
         try {
           CacheConfig cacheConf = new CacheConfig(getConf());
-          hf = HFile.createReader(fs, hfile.getPath(), cacheConf, getConf());
+          hf = HFile.createReader(fs, hfile.getPath(), cacheConf, true, getConf());
           hf.loadFileInfo();
           Cell startKv = hf.getFirstKey();
           start = CellUtil.cloneRow(startKv);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
index 82200bd..e46e43b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
@@ -98,7 +98,7 @@ public class HFileCorruptionChecker {
   protected void checkHFile(Path p) throws IOException {
     HFile.Reader r = null;
     try {
-      r = HFile.createReader(fs, p, cacheConf, conf);
+      r = HFile.createReader(fs, p, cacheConf, true, conf);
     } catch (CorruptHFileException che) {
       LOG.warn("Found corrupt HFile " + p, che);
       corrupted.add(p);
@@ -230,7 +230,7 @@ public class HFileCorruptionChecker {
   protected void checkMobFile(Path p) throws IOException {
     HFile.Reader r = null;
     try {
-      r = HFile.createReader(fs, p, cacheConf, conf);
+      r = HFile.createReader(fs, p, cacheConf, true, conf);
     } catch (CorruptHFileException che) {
       LOG.warn("Found corrupt mob file " + p, che);
       corruptedMobFiles.add(p);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
index 562630a..37ca56b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
@@ -403,7 +403,7 @@ public class HFilePerformanceEvaluation {
 
     @Override
     void setUp() throws Exception {
-      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), this.conf);
+      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
       this.reader.loadFileInfo();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index b1a0d3c..7668aa2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5174,6 +5174,7 @@ public class TestFromClientSide {
       assertEquals(2, store.getStorefilesCount());
       store.triggerMajorCompaction();
       region.compact(true);
+      store.closeAndArchiveCompactedFiles();
       waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
       assertEquals(1, store.getStorefilesCount());
       expectedBlockCount -= 2; // evicted two blocks, cached none

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
index 6a0921f..0fd3cdb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,7 +50,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
 public class TestHalfStoreFileReader {
   private static HBaseTestingUtility TEST_UTIL;
 
@@ -64,19 +65,14 @@ public class TestHalfStoreFileReader {
   }
 
   /**
-   * Test the scanner and reseek of a half hfile scanner. The scanner API
-   * demands that seekTo and reseekTo() only return < 0 if the key lies
-   * before the start of the file (with no position on the scanner). Returning
-   * 0 if perfect match (rare), and return > 1 if we got an imperfect match.
-   *
-   * The latter case being the most common, we should generally be returning 1,
-   * and if we do, there may or may not be a 'next' in the scanner/file.
-   *
-   * A bug in the half file scanner was returning -1 at the end of the bottom
-   * half, and that was causing the infrastructure above to go null causing NPEs
-   * and other problems.  This test reproduces that failure, and also tests
-   * both the bottom and top of the file while we are at it.
-   *
+   * Test the scanner and reseek of a half hfile scanner. The scanner API demands that seekTo and
+   * reseekTo() only return < 0 if the key lies before the start of the file (with no position on
+   * the scanner). Returning 0 if perfect match (rare), and return > 1 if we got an imperfect match.
+   * The latter case being the most common, we should generally be returning 1, and if we do, there
+   * may or may not be a 'next' in the scanner/file. A bug in the half file scanner was returning -1
+   * at the end of the bottom half, and that was causing the infrastructure above to go null causing
+   * NPEs and other problems. This test reproduces that failure, and also tests both the bottom and
+   * top of the file while we are at it.
    * @throws IOException
    */
   @Test
@@ -88,10 +84,8 @@ public class TestHalfStoreFileReader {
     FileSystem fs = FileSystem.get(conf);
     CacheConfig cacheConf = new CacheConfig(conf);
     HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
-    HFile.Writer w = HFile.getWriterFactory(conf, cacheConf)
-        .withPath(fs, p)
-        .withFileContext(meta)
-        .create();
+    HFile.Writer w =
+        HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create();
 
     // write some things.
     List<KeyValue> items = genSomeKeys();
@@ -100,12 +94,12 @@ public class TestHalfStoreFileReader {
     }
     w.close();
 
-    HFile.Reader r = HFile.createReader(fs, p, cacheConf, conf);
+    HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
     r.loadFileInfo();
     Cell midKV = r.midkey();
     byte[] midkey = CellUtil.cloneRow(midKV);
 
-    //System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
+    // System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
 
     Reference bottom = new Reference(midkey, Reference.Range.bottom);
     doTestOfScanAndReseek(p, fs, bottom, cacheConf);
@@ -116,11 +110,10 @@ public class TestHalfStoreFileReader {
     r.close();
   }
 
-  private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom,
-      CacheConfig cacheConf)
+  private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
       throws IOException {
-    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p,
-      cacheConf, bottom, TEST_UTIL.getConfiguration());
+    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConf, bottom, true,
+        new AtomicInteger(0), true, TEST_UTIL.getConfiguration());
     halfreader.loadFileInfo();
     final HFileScanner scanner = halfreader.getScanner(false, false);
 
@@ -128,110 +121,103 @@ public class TestHalfStoreFileReader {
     Cell curr;
     do {
       curr = scanner.getCell();
-      KeyValue reseekKv =
-          getLastOnCol(curr);
+      KeyValue reseekKv = getLastOnCol(curr);
       int ret = scanner.reseekTo(reseekKv);
       assertTrue("reseek to returned: " + ret, ret > 0);
-      //System.out.println(curr + ": " + ret);
+      // System.out.println(curr + ": " + ret);
     } while (scanner.next());
 
     int ret = scanner.reseekTo(getLastOnCol(curr));
-    //System.out.println("Last reseek: " + ret);
-    assertTrue( ret > 0 );
+    // System.out.println("Last reseek: " + ret);
+    assertTrue(ret > 0);
 
     halfreader.close(true);
   }
 
-
   // Tests the scanner on an HFile that is backed by HalfStoreFiles
   @Test
   public void testHalfScanner() throws IOException {
-      String root_dir = TEST_UTIL.getDataTestDir().toString();
-      Path p = new Path(root_dir, "test");
-      Configuration conf = TEST_UTIL.getConfiguration();
-      FileSystem fs = FileSystem.get(conf);
-      CacheConfig cacheConf = new CacheConfig(conf);
-      HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
-      HFile.Writer w = HFile.getWriterFactory(conf, cacheConf)
-              .withPath(fs, p)
-              .withFileContext(meta)
-              .create();
-
-      // write some things.
-      List<KeyValue> items = genSomeKeys();
-      for (KeyValue kv : items) {
-          w.append(kv);
-      }
-      w.close();
+    String root_dir = TEST_UTIL.getDataTestDir().toString();
+    Path p = new Path(root_dir, "test");
+    Configuration conf = TEST_UTIL.getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    CacheConfig cacheConf = new CacheConfig(conf);
+    HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
+    HFile.Writer w =
+        HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create();
 
+    // write some things.
+    List<KeyValue> items = genSomeKeys();
+    for (KeyValue kv : items) {
+      w.append(kv);
+    }
+    w.close();
 
-      HFile.Reader r = HFile.createReader(fs, p, cacheConf, conf);
-      r.loadFileInfo();
-      Cell midKV = r.midkey();
-      byte[] midkey = CellUtil.cloneRow(midKV);
+    HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
+    r.loadFileInfo();
+    Cell midKV = r.midkey();
+    byte[] midkey = CellUtil.cloneRow(midKV);
 
-      Reference bottom = new Reference(midkey, Reference.Range.bottom);
-      Reference top = new Reference(midkey, Reference.Range.top);
+    Reference bottom = new Reference(midkey, Reference.Range.bottom);
+    Reference top = new Reference(midkey, Reference.Range.top);
 
-      // Ugly code to get the item before the midkey
-      KeyValue beforeMidKey = null;
-      for (KeyValue item : items) {
-          if (CellComparator.COMPARATOR.compare(item, midKV) >= 0) {
-              break;
-          }
-          beforeMidKey = item;
+    // Ugly code to get the item before the midkey
+    KeyValue beforeMidKey = null;
+    for (KeyValue item : items) {
+      if (CellComparator.COMPARATOR.compare(item, midKV) >= 0) {
+        break;
       }
-      System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
-      System.out.println("beforeMidKey: " + beforeMidKey);
-
+      beforeMidKey = item;
+    }
+    System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
+    System.out.println("beforeMidKey: " + beforeMidKey);
 
-      // Seek on the splitKey, should be in top, not in bottom
-      Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf);
-      assertEquals(beforeMidKey, foundKeyValue);
+    // Seek on the splitKey, should be in top, not in bottom
+    Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf);
+    assertEquals(beforeMidKey, foundKeyValue);
 
-      // Seek tot the last thing should be the penultimate on the top, the one before the midkey on the bottom.
-      foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf);
-      assertEquals(items.get(items.size() - 2), foundKeyValue);
+    // Seek tot the last thing should be the penultimate on the top, the one before the midkey on
+    // the bottom.
+    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf);
+    assertEquals(items.get(items.size() - 2), foundKeyValue);
 
-      foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf);
-      assertEquals(beforeMidKey, foundKeyValue);
+    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf);
+    assertEquals(beforeMidKey, foundKeyValue);
 
-      // Try and seek before something that is in the bottom.
-      foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf);
-      assertNull(foundKeyValue);
+    // Try and seek before something that is in the bottom.
+    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf);
+    assertNull(foundKeyValue);
 
-      // Try and seek before the first thing.
-      foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf);
-      assertNull(foundKeyValue);
+    // Try and seek before the first thing.
+    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf);
+    assertNull(foundKeyValue);
 
-      // Try and seek before the second thing in the top and bottom.
-      foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf);
-      assertNull(foundKeyValue);
+    // Try and seek before the second thing in the top and bottom.
+    foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf);
+    assertNull(foundKeyValue);
 
-      foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf);
-      assertEquals(items.get(0), foundKeyValue);
+    foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf);
+    assertEquals(items.get(0), foundKeyValue);
 
-      // Try to seek before the splitKey in the top file
-      foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf);
-      assertNull(foundKeyValue);
-    }
+    // Try to seek before the splitKey in the top file
+    foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf);
+    assertNull(foundKeyValue);
+  }
 
   private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
-                                        CacheConfig cacheConfig)
-            throws IOException {
-      final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p,
-              cacheConfig, bottom, TEST_UTIL.getConfiguration());
-      halfreader.loadFileInfo();
-      final HFileScanner scanner = halfreader.getScanner(false, false);
-      scanner.seekBefore(seekBefore);
-      return scanner.getCell();
+      CacheConfig cacheConfig) throws IOException {
+    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConfig, bottom, true,
+        new AtomicInteger(0), true, TEST_UTIL.getConfiguration());
+    halfreader.loadFileInfo();
+    final HFileScanner scanner = halfreader.getScanner(false, false);
+    scanner.seekBefore(seekBefore);
+    return scanner.getCell();
   }
 
   private KeyValue getLastOnCol(Cell curr) {
-    return KeyValueUtil.createLastOnRow(
-        curr.getRowArray(), curr.getRowOffset(), curr.getRowLength(),
-        curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(),
-        curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength());
+    return KeyValueUtil.createLastOnRow(curr.getRowArray(), curr.getRowOffset(),
+      curr.getRowLength(), curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(),
+      curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength());
   }
 
   static final int SIZE = 1000;
@@ -244,18 +230,10 @@ public class TestHalfStoreFileReader {
     List<KeyValue> ret = new ArrayList<>(SIZE);
     for (int i = 0; i < SIZE; i++) {
       KeyValue kv =
-          new KeyValue(
-              _b(String.format("row_%04d", i)),
-              _b("family"),
-              _b("qualifier"),
-              1000, // timestamp
+          new KeyValue(_b(String.format("row_%04d", i)), _b("family"), _b("qualifier"), 1000, // timestamp
               _b("value"));
       ret.add(kv);
     }
     return ret;
   }
-
-
-
 }
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 3315b6f..49807a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -248,7 +248,7 @@ public class TestCacheOnWrite {
   }
 
   private void readStoreFile(boolean useTags) throws IOException {
-    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
     LOG.info("HFile information: " + reader);
     HFileContext meta = new HFileContextBuilder().withCompression(compress)
       .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 4db459a..d209430 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -161,7 +161,7 @@ public class TestHFile  {
     Writer w =
         HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).withFileContext(context).create();
     w.close();
-    Reader r = HFile.createReader(fs, f, cacheConf, conf);
+    Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
     r.loadFileInfo();
     assertNull(r.getFirstKey());
     assertNull(r.getLastKey());
@@ -178,7 +178,7 @@ public class TestHFile  {
     fsos.close();
 
     try {
-      Reader r = HFile.createReader(fs, f, cacheConf, conf);
+      Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
     } catch (CorruptHFileException che) {
       // Expected failure
       return;
@@ -218,7 +218,7 @@ public class TestHFile  {
     truncateFile(fs, w.getPath(), trunc);
 
     try {
-      Reader r = HFile.createReader(fs, trunc, cacheConf, conf);
+      Reader r = HFile.createReader(fs, trunc, cacheConf, true, conf);
     } catch (CorruptHFileException che) {
       // Expected failure
       return;
@@ -453,7 +453,7 @@ public class TestHFile  {
       writer.append(kv);
       writer.close();
       fout.close();
-      Reader reader = HFile.createReader(fs, mFile, cacheConf, conf);
+      Reader reader = HFile.createReader(fs, mFile, cacheConf, true, conf);
       reader.loadFileInfo();
       assertNull(reader.getMetaBlock("non-existant", false));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 28930db..2052c1d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -565,7 +565,7 @@ public class TestHFileBlockIndex {
    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
 
    // Read the HFile
-   HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, conf);
+   HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, true, conf);
 
    boolean hasArrayIndexOutOfBoundsException = false;
    try {
@@ -644,7 +644,7 @@ public class TestHFileBlockIndex {
       }
 
       // Read the HFile
-      HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, conf);
+      HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, true, conf);
       assertEquals(expectedNumLevels,
           reader.getTrailer().getNumDataIndexLevels());
 
@@ -774,7 +774,7 @@ public class TestHFileBlockIndex {
     }
     hfw.close();
 
-    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf);
+    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf);
     // Scanner doesn't do Cells yet.  Fix.
     HFileScanner scanner = reader.getScanner(true, true);
     for (int i = 0; i < keys.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 3264558..40e9ab7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -178,7 +178,7 @@ public class TestHFileEncryption {
     }
 
     // read it back in and validate correct crypto metadata
-    HFile.Reader reader = HFile.createReader(fs, path, cacheConf, conf);
+    HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
     try {
       reader.loadFileInfo();
       FixedFileTrailer trailer = reader.getTrailer();
@@ -230,7 +230,7 @@ public class TestHFileEncryption {
         LOG.info("Reading with " + fileContext);
         int i = 0;
         HFileScanner scanner = null;
-        HFile.Reader reader = HFile.createReader(fs, path, cacheConf, conf);
+        HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
         try {
           reader.loadFileInfo();
           FixedFileTrailer trailer = reader.getTrailer();
@@ -252,7 +252,7 @@ public class TestHFileEncryption {
 
         // Test random seeks with pread
         LOG.info("Random seeking with " + fileContext);
-        reader = HFile.createReader(fs, path, cacheConf, conf);
+        reader = HFile.createReader(fs, path, cacheConf, true, conf);
         try {
           scanner = reader.getScanner(false, true);
           assertTrue("Initial seekTo failed", scanner.seekTo());

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
index f1528c2..686024d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
@@ -77,7 +77,7 @@ public class TestHFileInlineToRootChunkConversion {
     }
     hfw.close();
 
-    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf);
+    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf);
     // Scanner doesn't do Cells yet.  Fix.
     HFileScanner scanner = reader.getScanner(true, true);
     for (int i = 0; i < keys.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 4c3db03..dfa5ee8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -81,7 +81,7 @@ public class TestPrefetch {
 
   private void readStoreFile(Path storeFilePath) throws Exception {
     // Open the file
-    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
 
     while (!reader.prefetchComplete()) {
       // Sleep for a bit

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
index a9ecf7b..b3cd8ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
@@ -109,8 +109,8 @@ public class TestReseekTo {
     writer.close();
     fout.close();
 
-    HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(),
-        ncTFile, cacheConf, TEST_UTIL.getConfiguration());
+    HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), ncTFile, cacheConf,
+      true, TEST_UTIL.getConfiguration());
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java
index d46af4a..f4309ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java
@@ -136,7 +136,7 @@ public class TestSeekBeforeWithInlineBlocks {
           }
   
           // Read the HFile
-          HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, conf);
+          HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, true, conf);
           
           // Sanity check the HFile index level
           assertEquals(expectedNumLevels, reader.getTrailer().getNumDataIndexLevels());

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
index 6531d2c..b268f0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
@@ -147,7 +147,7 @@ public class TestSeekTo {
     Path p = makeNewFile(tagUsage);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
     assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
@@ -206,7 +206,7 @@ public class TestSeekTo {
     Path p = makeNewFile(tagUsage);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
     assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
@@ -300,7 +300,7 @@ public class TestSeekTo {
     Path p = makeNewFile(tagUsage);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     assertEquals(2, reader.getDataBlockIndexReader().getRootBlockCount());
     HFileScanner scanner = reader.getScanner(false, true);
@@ -338,7 +338,7 @@ public class TestSeekTo {
     Path p = makeNewFile(tagUsage);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileBlockIndex.BlockIndexReader blockIndexReader =
       reader.getDataBlockIndexReader();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 20fc992..274a76e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -365,8 +365,8 @@ public class TestHFileOutputFormat2  {
       FileStatus[] file = fs.listStatus(sub1[0].getPath());
 
       // open as HFile Reader and pull out TIMERANGE FileInfo.
-      HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
-          new CacheConfig(conf), conf);
+      HFile.Reader rd =
+          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
       Map<byte[],byte[]> finfo = rd.loadFileInfo();
       byte[] range = finfo.get("TIMERANGE".getBytes());
       assertNotNull(range);
@@ -458,8 +458,8 @@ public class TestHFileOutputFormat2  {
       RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
       while(iterator.hasNext()) {
         LocatedFileStatus keyFileStatus = iterator.next();
-        HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf),
-            conf);
+        HFile.Reader reader =
+            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
         HFileScanner scanner = reader.getScanner(false, false, false);
         scanner.seekTo();
         Cell cell = scanner.getCell();
@@ -1043,7 +1043,7 @@ public class TestHFileOutputFormat2  {
         // verify that the compression on this file matches the configured
         // compression
         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
-        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
+        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
         Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
 
         byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
index b8d973b..8967ac7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
@@ -480,7 +480,7 @@ public class TestImportTSVWithVisibilityLabels implements Configurable {
    */
   private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
     Configuration conf = util.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index b7d5c6f..efcf91e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -556,7 +556,7 @@ public class TestImportTsv implements Configurable {
    */
   private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
     Configuration conf = util.getConfiguration();
-    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 7ae5afc..7f1723c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -620,8 +620,8 @@ public class TestLoadIncrementalHFiles {
 
   private int verifyHFile(Path p) throws IOException {
     Configuration conf = util.getConfiguration();
-    HFile.Reader reader = HFile.createReader(
-        p.getFileSystem(conf), p, new CacheConfig(conf), conf);
+    HFile.Reader reader =
+        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
     reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
index 84a2ba7..6647ffe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@ -60,8 +60,8 @@ public class TestMobFile extends TestCase {
     String caseName = getName();
     MobTestUtil.writeStoreFile(writer, caseName);
 
-    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
-        conf, cacheConf, BloomType.NONE));
+    MobFile mobFile =
+        new MobFile(new StoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true));
     byte[] family = Bytes.toBytes(caseName);
     byte[] qualify = Bytes.toBytes(caseName);
 
@@ -112,8 +112,8 @@ public class TestMobFile extends TestCase {
             .build();
     MobTestUtil.writeStoreFile(writer, getName());
 
-    MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
-        conf, cacheConf, BloomType.NONE));
+    MobFile mobFile =
+        new MobFile(new StoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true));
     assertNotNull(mobFile.getScanner());
     assertTrue(mobFile.getScanner() instanceof StoreFileScanner);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index 83936aa..47a1c24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -772,9 +772,7 @@ public class TestMobCompactor {
     ResultScanner results = table.getScanner(scan);
     int count = 0;
     for (Result res : results) {
-      for (Cell cell : res.listCells()) {
-        count++;
-      }
+      count += res.size();
     }
     results.close();
     return count;
@@ -817,8 +815,9 @@ public class TestMobCompactor {
       Path path = files[0].getPath();
       CacheConfig cacheConf = new CacheConfig(conf);
       StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf,
-        BloomType.NONE);
-      HFile.Reader reader = sf.createReader().getHFileReader();
+        BloomType.NONE, true);
+      sf.initReader();
+      HFile.Reader reader = sf.getReader().getHFileReader();
       byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
       Assert.assertTrue(null != encryptionKey);
       Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index 290e6f4..f65e224 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -515,10 +515,11 @@ public class TestPartitionedMobCompactor {
       try {
         for (CompactionDelPartition delPartition : request.getDelPartitions()) {
           for (Path newDelPath : delPartition.listDelFiles()) {
-            StoreFile sf = new StoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE);
-            // pre-create reader of a del file to avoid race condition when opening the reader in each
-            // partition.
-            sf.createReader();
+            StoreFile sf =
+                new StoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE, true);
+            // pre-create reader of a del file to avoid race condition when opening the reader in
+            // each partition.
+            sf.initReader();
             delPartition.addStoreFile(sf);
           }
         }
@@ -768,7 +769,6 @@ public class TestPartitionedMobCompactor {
    * @param delPartitions all del partitions
    */
   private void compareDelFiles(List<CompactionDelPartition> delPartitions) {
-    int i = 0;
     Map<Path, Path> delMap = new HashMap<>();
     for (CompactionDelPartition delPartition : delPartitions) {
       for (Path f : delPartition.listDelFiles()) {
@@ -850,12 +850,12 @@ public class TestPartitionedMobCompactor {
   private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
     List<StoreFile> sfs = new ArrayList<>();
     int size = 0;
-    for(Path path : paths) {
-      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    for (Path path : paths) {
+      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
       sfs.add(sf);
     }
-    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
-        false, false, HConstants.LATEST_TIMESTAMP);
+    List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs,
+      false, true, false, false, HConstants.LATEST_TIMESTAMP));
     Scan scan = new Scan();
     scan.setMaxVersions(hcd.getMaxVersions());
     long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
index a074a9a..e36d16f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
@@ -592,10 +592,9 @@ public class DataBlockEncodingTool {
     Path path = new Path(hfilePath);
     CacheConfig cacheConf = new CacheConfig(conf);
     FileSystem fs = FileSystem.get(conf);
-    StoreFile hsf = new StoreFile(fs, path, conf, cacheConf,
-      BloomType.NONE);
-
-    StoreFileReader reader = hsf.createReader();
+    StoreFile hsf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    StoreFileReader reader = hsf.getReader();
     reader.loadFileInfo();
     KeyValueScanner scanner = reader.getStoreFileScanner(true, true, false, 0, 0, false);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
index eb77c28..f47fc4e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
@@ -60,9 +60,9 @@ public class EncodedSeekPerformanceTest {
 
     // read all of the key values
     StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
-        path, configuration, cacheConf, BloomType.NONE);
-
-    StoreFileReader reader = storeFile.createReader();
+        path, configuration, cacheConf, BloomType.NONE, true);
+    storeFile.initReader();
+    StoreFileReader reader = storeFile.getReader();
     StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false);
     Cell current;
 
@@ -90,11 +90,11 @@ public class EncodedSeekPerformanceTest {
       List<Cell> seeks) throws IOException {
     // read all of the key values
     StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(),
-      path, configuration, cacheConf, BloomType.NONE);
-
+      path, configuration, cacheConf, BloomType.NONE, true);
+    storeFile.initReader();
     long totalSize = 0;
 
-    StoreFileReader reader = storeFile.createReader();
+    StoreFileReader reader = storeFile.getReader();
     StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false);
 
     long startReadingTime = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index 1169434..d52c6c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -48,7 +48,7 @@ public class MockStoreFile extends StoreFile {
   MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
       long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
     super(testUtil.getTestFileSystem(), testPath, testUtil.getConfiguration(),
-      new CacheConfig(testUtil.getConfiguration()), BloomType.NONE);
+        new CacheConfig(testUtil.getConfiguration()), BloomType.NONE, true);
     this.length = length;
     this.isRef = isRef;
     this.ageInDisk = ageInDisk;
@@ -126,6 +126,11 @@ public class MockStoreFile extends StoreFile {
   }
 
   @Override
+  public boolean isCompactedAway() {
+    return compactedAway;
+  }
+
+  @Override
   public long getModificationTimeStamp() {
     return modificationTime;
   }
@@ -136,11 +141,22 @@ public class MockStoreFile extends StoreFile {
   }
 
   @Override
+  public void initReader() throws IOException {
+  }
+
+  @Override
+  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
+      boolean pread, boolean isCompaction, long readPt, long scannerOrder,
+      boolean canOptimizeForNonNullColumn) throws IOException {
+    return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder,
+      canOptimizeForNonNullColumn);
+  }
+
+  @Override
   public StoreFileReader getReader() {
     final long len = this.length;
     final TimeRangeTracker timeRangeTracker = this.timeRangeTracker;
     final long entries = this.entryCount;
-    final boolean compactedAway = this.compactedAway;
     return new StoreFileReader() {
       @Override
       public long length() {
@@ -158,11 +174,6 @@ public class MockStoreFile extends StoreFile {
       }
 
       @Override
-      public boolean isCompactedAway() {
-        return compactedAway;
-      }
-
-      @Override
       public void close(boolean evictOnClose) throws IOException {
         // no-op
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 9fed202..efe0605 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -218,9 +218,9 @@ public class TestCacheOnWriteInSchema {
   private void readStoreFile(Path path) throws IOException {
     CacheConfig cacheConf = store.getCacheConfig();
     BlockCache cache = cacheConf.getBlockCache();
-    StoreFile sf = new StoreFile(fs, path, conf, cacheConf,
-      BloomType.ROWCOL);
-    HFile.Reader reader = sf.createReader().getHFileReader();
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.ROWCOL, true);
+    sf.initReader();
+    HFile.Reader reader = sf.getReader().getHFileReader();
     try {
       // Open a scanner with (on read) caching disabled
       HFileScanner scanner = reader.getScanner(false, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index 7154511..58dbe8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -40,15 +40,12 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.experimental.categories.Category;
 
-@Category(SmallTests.class)
 public class TestCompactionPolicy {
   private final static Log LOG = LogFactory.getLog(TestCompactionPolicy.class);
   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
index dfea761..57a5f59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
@@ -200,8 +200,9 @@ public class TestCompoundBloomFilter {
 
   private void readStoreFile(int t, BloomType bt, List<KeyValue> kvs,
       Path sfPath) throws IOException {
-    StoreFile sf = new StoreFile(fs, sfPath, conf, cacheConf, bt);
-    StoreFileReader r = sf.createReader();
+    StoreFile sf = new StoreFile(fs, sfPath, conf, cacheConf, bt, true);
+    sf.initReader();
+    StoreFileReader r = sf.getReader();
     final boolean pread = true; // does not really matter
     StoreFileScanner scanner = r.getStoreFileScanner(true, pread, false, 0, 0, false);
 
@@ -285,7 +286,7 @@ public class TestCompoundBloomFilter {
 
   private boolean isInBloom(StoreFileScanner scanner, byte[] row,
       byte[] qualifier) {
-    Scan scan = new Scan(row, row);
+    Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
     scan.addColumn(Bytes.toBytes(RandomKeyValueUtil.COLUMN_FAMILY_NAME), qualifier);
     Store store = mock(Store.class);
     HColumnDescriptor hcd = mock(HColumnDescriptor.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index b34c307..3e7477d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -280,7 +280,7 @@ public class TestEncryptionKeyRotation {
 
   private static byte[] extractHFileKey(Path path) throws Exception {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
-      new CacheConfig(conf), conf);
+      new CacheConfig(conf), true, conf);
     try {
       reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
index 2b0ab7b..3d8eeed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
@@ -66,7 +66,7 @@ public class TestEncryptionRandomKeying {
 
   private static byte[] extractHFileKey(Path path) throws Exception {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
-      new CacheConfig(conf), conf);
+      new CacheConfig(conf), true, conf);
     try {
       reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 9f0975d..e231b60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -94,10 +94,10 @@ public class TestFSErrorsExposed {
     TestStoreFile.writeStoreFile(
         writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
 
-    StoreFile sf = new StoreFile(fs, writer.getPath(),
-      util.getConfiguration(), cacheConf, BloomType.NONE);
-
-    StoreFileReader reader = sf.createReader();
+    StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
+    sf.initReader();
+    StoreFileReader reader = sf.getReader();
     HFileScanner scanner = reader.getScanner(false, true);
 
     FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
@@ -144,8 +144,8 @@ public class TestFSErrorsExposed {
     TestStoreFile.writeStoreFile(
         writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
 
-    StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
-      cacheConf, BloomType.NONE);
+    StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
 
     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
         Collections.singletonList(sf), false, true, false, false,

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
index a50dc42..1997b31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
@@ -293,8 +293,9 @@ public class TestMobStoreCompaction {
     if (fs.exists(mobDirPath)) {
       FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
       for (FileStatus file : files) {
-        StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE);
-        Map<byte[], byte[]> fileInfo = sf.createReader().loadFileInfo();
+        StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
+        sf.initReader();
+        Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
         byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
         assertTrue(count != null);
         mobCellsCount += Bytes.toLong(count);
@@ -407,7 +408,7 @@ public class TestMobStoreCompaction {
     int size = 0;
     if (fs.exists(mobDirPath)) {
       for (FileStatus f : fs.listStatus(mobDirPath)) {
-        StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE);
+        StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true);
         sfs.add(sf);
         if (StoreFileInfo.isDelFile(sf.getPath())) {
           numDelfiles++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index ecb808e..bf0fb05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -108,8 +108,8 @@ public class TestReversibleScanners {
           .withFileContext(hFileContext).build();
       writeStoreFile(writer);
 
-      StoreFile sf = new StoreFile(fs, writer.getPath(),
-          TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
+      StoreFile sf = new StoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+          BloomType.NONE, true);
 
       List<StoreFileScanner> scanners = StoreFileScanner
           .getScannersForStoreFiles(Collections.singletonList(sf),
@@ -162,11 +162,11 @@ public class TestReversibleScanners {
     writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
         writer2 });
 
-    StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
-        TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
+    StoreFile sf1 = new StoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
 
-    StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
-        TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
+    StoreFile sf2 = new StoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
     /**
      * Test without MVCC
      */
@@ -252,11 +252,11 @@ public class TestReversibleScanners {
     writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
         writer2 });
 
-    StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
-        TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
+    StoreFile sf1 = new StoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
 
-    StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
-        TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
+    StoreFile sf2 = new StoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
+        BloomType.NONE, true);
 
     ScanType scanType = ScanType.USER_SCAN;
     ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
@@ -272,7 +272,7 @@ public class TestReversibleScanners {
     // Case 2.Test reversed scan with a specified start row
     int startRowNum = ROWSIZE / 2;
     byte[] startRow = ROWS[startRowNum];
-    scan.setStartRow(startRow);
+    scan.withStartRow(startRow);
     storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
         scanType, scanInfo, MAXMVCC);
     verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
@@ -354,21 +354,21 @@ public class TestReversibleScanners {
 
     // Case5: Case4 + specify start row
     int startRowNum = ROWSIZE * 3 / 4;
-    scan.setStartRow(ROWS[startRowNum]);
+    scan.withStartRow(ROWS[startRowNum]);
     scanner = region.getScanner(scan);
     verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
         false);
 
     // Case6: Case4 + specify stop row
     int stopRowNum = ROWSIZE / 4;
-    scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
-    scan.setStopRow(ROWS[stopRowNum]);
+    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
+    scan.withStopRow(ROWS[stopRowNum]);
     scanner = region.getScanner(scan);
     verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
         - stopRowNum - 1), false);
 
     // Case7: Case4 + specify start row + specify stop row
-    scan.setStartRow(ROWS[startRowNum]);
+    scan.withStartRow(ROWS[startRowNum]);
     scanner = region.getScanner(scan);
     verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
         (startRowNum - stopRowNum), false);
@@ -595,9 +595,6 @@ public class TestReversibleScanners {
 
       // Case2: seek to the previous row in backwardSeek
     int seekRowNum = ROWSIZE - 3;
-    KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
-      expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
-          readPoint);
     res = false;
     for (KeyValueScanner scanner : scanners) {
       res |= scanner.backwardSeek(expectedKey);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 0d339b1..76bf1cc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -276,7 +276,7 @@ public class TestStore {
     writer.close();
 
     // Verify that compression and encoding settings are respected
-    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
     Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
     Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
     reader.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
index 7e4ebd8..d1444c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -116,8 +117,7 @@ public class TestStoreFile extends HBaseTestCase {
     writeStoreFile(writer);
 
     Path sfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
-    StoreFile sf = new StoreFile(this.fs, sfPath, conf, cacheConf,
-      BloomType.NONE);
+    StoreFile sf = new StoreFile(this.fs, sfPath, conf, cacheConf, BloomType.NONE, true);
     checkHalfHFile(regionFs, sf);
   }
 
@@ -169,9 +169,9 @@ public class TestStoreFile extends HBaseTestCase {
     writeStoreFile(writer);
 
     Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
-    StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf,
-      BloomType.NONE);
-    StoreFileReader reader = hsf.createReader();
+    StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    StoreFileReader reader = hsf.getReader();
     // Split on a row, not in middle of row.  Midkey returned by reader
     // may be in middle of row.  Create new one with empty column and
     // timestamp.
@@ -184,11 +184,11 @@ public class TestStoreFile extends HBaseTestCase {
     // Make a reference
     HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow);
     Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true);
-    StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf,
-      BloomType.NONE);
+    StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf, BloomType.NONE, true);
+    refHsf.initReader();
     // Now confirm that I can read from the reference and that it only gets
     // keys from top half of the file.
-    HFileScanner s = refHsf.createReader().getScanner(false, false);
+    HFileScanner s = refHsf.getReader().getScanner(false, false);
     for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
       ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey());
       kv = KeyValueUtil.createKeyValueFromKey(bb);
@@ -242,13 +242,14 @@ public class TestStoreFile extends HBaseTestCase {
 
     // Try to open store file from link
     StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
-    StoreFile hsf = new StoreFile(this.fs, storeFileInfo, testConf, cacheConf,
-      BloomType.NONE);
+    StoreFile hsf =
+        new StoreFile(this.fs, storeFileInfo, testConf, cacheConf, BloomType.NONE, true);
     assertTrue(storeFileInfo.isLink());
+    hsf.initReader();
 
     // Now confirm that I can read from the link
     int count = 1;
-    HFileScanner s = hsf.createReader().getScanner(false, false);
+    HFileScanner s = hsf.getReader().getScanner(false, false);
     s.seekTo();
     while (s.next()) {
       count++;
@@ -295,8 +296,8 @@ public class TestStoreFile extends HBaseTestCase {
     // <root>/clone/splitB/<cf>/<reftohfilelink>
     HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY);
     HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null);
-    StoreFile f = new StoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE);
-    f.createReader();
+    StoreFile f = new StoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE, true);
+    f.initReader();
     Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top
     Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom
     f.closeReader(true);
@@ -307,12 +308,12 @@ public class TestStoreFile extends HBaseTestCase {
     // reference to a hfile link.  This code in StoreFile that handles this case.
 
     // Try to open store file from link
-    StoreFile hsfA = new StoreFile(this.fs, pathA, testConf, cacheConf,
-      BloomType.NONE);
+    StoreFile hsfA = new StoreFile(this.fs, pathA, testConf, cacheConf, BloomType.NONE, true);
+    hsfA.initReader();
 
     // Now confirm that I can read from the ref to link
     int count = 1;
-    HFileScanner s = hsfA.createReader().getScanner(false, false);
+    HFileScanner s = hsfA.getReader().getScanner(false, false);
     s.seekTo();
     while (s.next()) {
       count++;
@@ -320,11 +321,11 @@ public class TestStoreFile extends HBaseTestCase {
     assertTrue(count > 0); // read some rows here
 
     // Try to open store file from link
-    StoreFile hsfB = new StoreFile(this.fs, pathB, testConf, cacheConf,
-      BloomType.NONE);
+    StoreFile hsfB = new StoreFile(this.fs, pathB, testConf, cacheConf, BloomType.NONE, true);
+    hsfB.initReader();
 
     // Now confirm that I can read from the ref to link
-    HFileScanner sB = hsfB.createReader().getScanner(false, false);
+    HFileScanner sB = hsfB.getReader().getScanner(false, false);
     sB.seekTo();
 
     //count++ as seekTo() will advance the scanner
@@ -339,7 +340,8 @@ public class TestStoreFile extends HBaseTestCase {
 
   private void checkHalfHFile(final HRegionFileSystem regionFs, final StoreFile f)
       throws IOException {
-    Cell midkey = f.createReader().midkey();
+    f.initReader();
+    Cell midkey = f.getReader().midkey();
     KeyValue midKV = (KeyValue)midkey;
     byte [] midRow = CellUtil.cloneRow(midKV);
     // Create top split.
@@ -351,10 +353,12 @@ public class TestStoreFile extends HBaseTestCase {
         midRow, null);
     Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false);
     // Make readers on top and bottom.
-    StoreFileReader top = new StoreFile(
-      this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
-    StoreFileReader bottom = new StoreFile(
-      this.fs, bottomPath, conf, cacheConf, BloomType.NONE).createReader();
+    StoreFile topF = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
+    topF.initReader();
+    StoreFileReader top = topF.getReader();
+    StoreFile bottomF = new StoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
+    bottomF.initReader();
+    StoreFileReader bottom = bottomF.getReader();
     ByteBuffer previous = null;
     LOG.info("Midkey: " + midKV.toString());
     ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midKV.getKey());
@@ -412,7 +416,9 @@ public class TestStoreFile extends HBaseTestCase {
 
       assertNull(bottomPath);
 
-      top = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader();
+      topF = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE, true);
+      topF.initReader();
+      top = topF.getReader();
       // Now read from the top.
       first = true;
       topScanner = top.getScanner(false, false);
@@ -449,8 +455,10 @@ public class TestStoreFile extends HBaseTestCase {
       topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true);
       bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false);
       assertNull(topPath);
-      bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf,
-        BloomType.NONE).createReader();
+
+      bottomF = new StoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE, true);
+      bottomF.initReader();
+      bottom = bottomF.getReader();
       first = true;
       bottomScanner = bottom.getScanner(false, false);
       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
@@ -502,7 +510,8 @@ public class TestStoreFile extends HBaseTestCase {
     }
     writer.close();
 
-    StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
+    StoreFileReader reader =
+        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
     reader.loadFileInfo();
     reader.loadBloomfilter();
     StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
@@ -590,7 +599,8 @@ public class TestStoreFile extends HBaseTestCase {
     }
     writer.close();
 
-    StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
+    StoreFileReader reader =
+        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
     reader.loadFileInfo();
     reader.loadBloomfilter();
 
@@ -635,7 +645,8 @@ public class TestStoreFile extends HBaseTestCase {
     writeStoreFile(writer);
     writer.close();
 
-    StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
+    StoreFileReader reader =
+        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
 
     // Now do reseek with empty KV to position to the beginning of the file
 
@@ -695,7 +706,8 @@ public class TestStoreFile extends HBaseTestCase {
       }
       writer.close();
 
-      StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
+      StoreFileReader reader =
+          new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
       reader.loadFileInfo();
       reader.loadBloomfilter();
       StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
@@ -844,12 +856,13 @@ public class TestStoreFile extends HBaseTestCase {
     writer.close();
 
     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
-      BloomType.NONE);
+      BloomType.NONE, true);
     Store store = mock(Store.class);
     HColumnDescriptor hcd = mock(HColumnDescriptor.class);
     when(hcd.getName()).thenReturn(family);
     when(store.getFamily()).thenReturn(hcd);
-    StoreFileReader reader = hsf.createReader();
+    hsf.initReader();
+    StoreFileReader reader = hsf.getReader();
     StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
     TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
     columns.add(qualifier);
@@ -901,11 +914,12 @@ public class TestStoreFile extends HBaseTestCase {
     Path pathCowOff = new Path(baseDir, "123456789");
     StoreFileWriter writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
-      BloomType.NONE);
+      BloomType.NONE, true);
     LOG.debug(hsf.getPath().toString());
 
     // Read this file, we should see 3 misses
-    StoreFileReader reader = hsf.createReader();
+    hsf.initReader();
+    StoreFileReader reader = hsf.getReader();
     reader.loadFileInfo();
     StoreFileScanner scanner = getStoreFileScanner(reader, true, true);
     scanner.seek(KeyValue.LOWESTKEY);
@@ -923,10 +937,11 @@ public class TestStoreFile extends HBaseTestCase {
     Path pathCowOn = new Path(baseDir, "123456788");
     writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
     hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
-      BloomType.NONE);
+      BloomType.NONE, true);
 
     // Read this file, we should see 3 hits
-    reader = hsf.createReader();
+    hsf.initReader();
+    reader = hsf.getReader();
     scanner = getStoreFileScanner(reader, true, true);
     scanner.seek(KeyValue.LOWESTKEY);
     while (scanner.next() != null);
@@ -938,15 +953,15 @@ public class TestStoreFile extends HBaseTestCase {
     reader.close(cacheConf.shouldEvictOnClose());
 
     // Let's read back the two files to ensure the blocks exactly match
-    hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
-      BloomType.NONE);
-    StoreFileReader readerOne = hsf.createReader();
+    hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    StoreFileReader readerOne = hsf.getReader();
     readerOne.loadFileInfo();
     StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true);
     scannerOne.seek(KeyValue.LOWESTKEY);
-    hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
-      BloomType.NONE);
-    StoreFileReader readerTwo = hsf.createReader();
+    hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    StoreFileReader readerTwo = hsf.getReader();
     readerTwo.loadFileInfo();
     StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true);
     scannerTwo.seek(KeyValue.LOWESTKEY);
@@ -977,9 +992,9 @@ public class TestStoreFile extends HBaseTestCase {
     // Let's close the first file with evict on close turned on
     conf.setBoolean("hbase.rs.evictblocksonclose", true);
     cacheConf = new CacheConfig(conf);
-    hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
-      BloomType.NONE);
-    reader = hsf.createReader();
+    hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    reader = hsf.getReader();
     reader.close(cacheConf.shouldEvictOnClose());
 
     // We should have 3 new evictions but the evict count stat should not change. Eviction because
@@ -991,9 +1006,9 @@ public class TestStoreFile extends HBaseTestCase {
     // Let's close the second file with evict on close turned off
     conf.setBoolean("hbase.rs.evictblocksonclose", false);
     cacheConf = new CacheConfig(conf);
-    hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
-      BloomType.NONE);
-    reader = hsf.createReader();
+    hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE, true);
+    hsf.initReader();
+    reader = hsf.getReader();
     reader.close(cacheConf.shouldEvictOnClose());
 
     // We expect no changes
@@ -1078,9 +1093,10 @@ public class TestStoreFile extends HBaseTestCase {
             .build();
     writer.close();
 
-    StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf,
-      cacheConf, BloomType.NONE);
-    StoreFileReader reader = storeFile.createReader();
+    StoreFile storeFile =
+        new StoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
+    storeFile.initReader();
+    StoreFileReader reader = storeFile.getReader();
 
     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
     byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
index d628dc8..3d3c79c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
@@ -23,23 +23,24 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagUtil;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -74,7 +75,8 @@ public class TestStoreFileScannerWithTagCompression {
     writeStoreFile(writer);
     writer.close();
 
-    StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
+    StoreFileReader reader =
+        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
     StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false);
     try {
       // Now do reseek with empty KV to position to the beginning of the file

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index dff6919..170fba2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -67,9 +67,6 @@ public class TestCompactor {
     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
       anyBoolean())).thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
-    when(sf.createReader()).thenReturn(r);
-    when(sf.createReader(anyBoolean())).thenReturn(r);
-    when(sf.cloneForReader()).thenReturn(sf);
     when(sf.getMaxSequenceId()).thenReturn(maxSequenceId);
     return sf;
   }


[3/3] hbase git commit: HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction

Posted by zh...@apache.org.
HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66b616d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66b616d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66b616d7

Branch: refs/heads/master
Commit: 66b616d7a3d6f4ad6d20962e2dfc0c82a4092ddb
Parents: 719a30b
Author: zhangduo <zh...@apache.org>
Authored: Mon Apr 17 22:53:49 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Apr 19 09:26:33 2017 +0800

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      |  63 +++---
 .../org/apache/hadoop/hbase/io/FileLink.java    |  14 +-
 .../hadoop/hbase/io/HalfStoreFileReader.java    |  13 +-
 .../hadoop/hbase/io/hfile/CacheConfig.java      |   9 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |  85 ++++----
 .../hbase/io/hfile/HFilePrettyPrinter.java      |   2 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  26 +--
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  45 ++--
 .../procedure/MergeTableRegionsProcedure.java   |   9 +-
 .../procedure/SplitTableRegionProcedure.java    |   8 +-
 .../apache/hadoop/hbase/mob/CachedMobFile.java  |   4 +-
 .../org/apache/hadoop/hbase/mob/MobFile.java    |   8 +-
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  13 +-
 .../compactions/PartitionedMobCompactor.java    |  26 +--
 .../regionserver/DefaultStoreFileManager.java   |   2 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   6 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +-
 .../hbase/regionserver/HRegionFileSystem.java   |   6 +-
 .../hadoop/hbase/regionserver/HStore.java       |  19 +-
 .../regionserver/ReversedStoreScanner.java      |   2 +-
 .../hadoop/hbase/regionserver/StoreFile.java    | 216 ++++++++++++-------
 .../hbase/regionserver/StoreFileInfo.java       |  21 +-
 .../hbase/regionserver/StoreFileReader.java     |  86 ++++----
 .../hbase/regionserver/StoreFileScanner.java    |  50 +++--
 .../hadoop/hbase/regionserver/StoreScanner.java |   6 +-
 .../regionserver/compactions/Compactor.java     |  44 +---
 .../hadoop/hbase/util/CompressionTest.java      |   2 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |   6 +-
 .../hbase/util/hbck/HFileCorruptionChecker.java |   4 +-
 .../hbase/HFilePerformanceEvaluation.java       |   2 +-
 .../hadoop/hbase/client/TestFromClientSide.java |   1 +
 .../hbase/io/TestHalfStoreFileReader.java       | 192 ++++++++---------
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |   2 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |   8 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |   6 +-
 .../hbase/io/hfile/TestHFileEncryption.java     |   6 +-
 .../TestHFileInlineToRootChunkConversion.java   |   2 +-
 .../hadoop/hbase/io/hfile/TestPrefetch.java     |   2 +-
 .../hadoop/hbase/io/hfile/TestReseekTo.java     |   4 +-
 .../hfile/TestSeekBeforeWithInlineBlocks.java   |   2 +-
 .../hadoop/hbase/io/hfile/TestSeekTo.java       |   8 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java |  10 +-
 .../TestImportTSVWithVisibilityLabels.java      |   2 +-
 .../hadoop/hbase/mapreduce/TestImportTsv.java   |   2 +-
 .../mapreduce/TestLoadIncrementalHFiles.java    |   4 +-
 .../apache/hadoop/hbase/mob/TestMobFile.java    |   8 +-
 .../hbase/mob/compactions/TestMobCompactor.java |   9 +-
 .../TestPartitionedMobCompactor.java            |  18 +-
 .../regionserver/DataBlockEncodingTool.java     |   7 +-
 .../EncodedSeekPerformanceTest.java             |  12 +-
 .../hbase/regionserver/MockStoreFile.java       |  25 ++-
 .../regionserver/TestCacheOnWriteInSchema.java  |   6 +-
 .../regionserver/TestCompactionPolicy.java      |   3 -
 .../regionserver/TestCompoundBloomFilter.java   |   7 +-
 .../regionserver/TestEncryptionKeyRotation.java |   2 +-
 .../TestEncryptionRandomKeying.java             |   2 +-
 .../hbase/regionserver/TestFSErrorsExposed.java |  12 +-
 .../regionserver/TestMobStoreCompaction.java    |   7 +-
 .../regionserver/TestReversibleScanners.java    |  33 ++-
 .../hadoop/hbase/regionserver/TestStore.java    |   2 +-
 .../hbase/regionserver/TestStoreFile.java       | 120 ++++++-----
 .../TestStoreFileScannerWithTagCompression.java |  10 +-
 .../regionserver/compactions/TestCompactor.java |   3 -
 .../compactions/TestStripeCompactionPolicy.java |   3 -
 .../hbase/util/TestHBaseFsckEncryption.java     |   2 +-
 .../hadoop/hbase/spark/BulkLoadSuite.scala      |   8 +-
 66 files changed, 701 insertions(+), 650 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..055e46a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,11 +34,14 @@ import com.google.common.annotations.VisibleForTesting;
  * as well as closing streams. Initialization is not thread-safe, but normal operation is;
  * see method comments.
  */
-public class FSDataInputStreamWrapper {
+@InterfaceAudience.Private
+public class FSDataInputStreamWrapper implements Closeable {
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
   private final boolean doCloseStreams;
+  private final boolean dropBehind;
+  private final long readahead;
 
   /** Two stream handles, one with and one without FS-level checksum.
    * HDFS checksum setting is on FS level, not single read level, so you have to keep two
@@ -75,43 +81,52 @@ public class FSDataInputStreamWrapper {
   private volatile int hbaseChecksumOffCount = -1;
 
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
-    this(fs, null, path, false);
+    this(fs, path, false, -1L);
   }
 
-  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
-    this(fs, null, path, dropBehind);
+  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
+    this(fs, null, path, dropBehind, readahead);
   }
 
-  public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
-    this(fs, link, null, false);
-  }
   public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
-                                  boolean dropBehind) throws IOException {
-    this(fs, link, null, dropBehind);
+                                  boolean dropBehind, long readahead) throws IOException {
+    this(fs, link, null, dropBehind, readahead);
   }
 
-  private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
-                                   Path path, boolean dropBehind) throws IOException {
+  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
+      long readahead) throws IOException {
     assert (path == null) != (link == null);
     this.path = path;
     this.link = link;
     this.doCloseStreams = true;
+    this.dropBehind = dropBehind;
+    this.readahead = readahead;
     // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
     // that wraps over the specified fs. In this case, we will not be able to avoid
     // checksumming inside the filesystem.
-    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
+    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
 
     // Initially we are going to read the tail block. Open the reader w/FS checksum.
     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+    setStreamOptions(stream);
+  }
+
+  private void setStreamOptions(FSDataInputStream in) {
     try {
       this.stream.setDropBehind(dropBehind);
     } catch (Exception e) {
       // Skipped.
     }
+    if (readahead >= 0) {
+      try {
+        this.stream.setReadahead(readahead);
+      } catch (Exception e) {
+        // Skipped.
+      }
+    }
   }
 
-
   /**
    * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
    * reads finish and before any other reads start (what happens in reality is we read the
@@ -127,6 +142,7 @@ public class FSDataInputStreamWrapper {
     if (useHBaseChecksum) {
       FileSystem fsNc = hfs.getNoChecksumFs();
       this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
+      setStreamOptions(streamNoFsChecksum);
       this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
       // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
       this.stream.close();
@@ -150,6 +166,8 @@ public class FSDataInputStreamWrapper {
     link = null;
     hfs = null;
     useHBaseChecksumConfigured = useHBaseChecksum = false;
+    dropBehind = false;
+    readahead = 0;
   }
 
   /**
@@ -201,19 +219,14 @@ public class FSDataInputStreamWrapper {
   }
 
   /** Close stream(s) if necessary. */
-  public void close() throws IOException {
-    if (!doCloseStreams) return;
-    try {
-      if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
-        streamNoFsChecksum.close();
-        streamNoFsChecksum = null;
-      }
-    } finally {
-      if (stream != null) {
-        stream.close();
-        stream = null;
-      }
+  @Override
+  public void close() {
+    if (!doCloseStreams) {
+      return;
     }
+    // we do not care about the close exception as it is for reading, no data loss issue.
+    IOUtils.closeQuietly(streamNoFsChecksum);
+    IOUtils.closeQuietly(stream);
   }
 
   public HFileSystem getHfs() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index ca0dfbc..8a79efb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -29,6 +29,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
@@ -99,7 +101,7 @@ public class FileLink {
    * and the alternative locations, when the file is moved.
    */
   private static class FileLinkInputStream extends InputStream
-      implements Seekable, PositionedReadable {
+      implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead {
     private FSDataInputStream in = null;
     private Path currentPath = null;
     private long pos = 0;
@@ -306,6 +308,16 @@ public class FileLink {
       }
       throw new FileNotFoundException("Unable to open link: " + fileLink);
     }
+
+    @Override
+    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+      in.setReadahead(readahead);
+    }
+
+    @Override
+    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
+      in.setDropBehind(dropCache);
+    }
   }
 
   private Path[] locations = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index a4a281e..c4dbc39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,10 +73,10 @@ public class HalfStoreFileReader extends StoreFileReader {
    * @param conf Configuration
    * @throws IOException
    */
-  public HalfStoreFileReader(final FileSystem fs, final Path p,
-      final CacheConfig cacheConf, final Reference r, final Configuration conf)
+  public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r,
+      boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
       throws IOException {
-    super(fs, p, cacheConf, conf);
+    super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't
@@ -99,9 +100,9 @@ public class HalfStoreFileReader extends StoreFileReader {
    * @throws IOException
    */
   public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in,
-      long size, final CacheConfig cacheConf,  final Reference r, final Configuration conf)
-      throws IOException {
-    super(fs, p, in, size, cacheConf, conf);
+      long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile,
+      AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException {
+    super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4db60b5..791445b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -283,11 +283,10 @@ public class CacheConfig {
   }
 
   /**
-   * Create a block cache configuration with the specified cache and
-   * configuration parameters.
+   * Create a block cache configuration with the specified cache and configuration parameters.
    * @param blockCache reference to block cache, null if completely disabled
    * @param cacheDataOnRead whether DATA blocks should be cached on read (we always cache INDEX
-   * blocks and BLOOM blocks; this cannot be disabled).
+   *          blocks and BLOOM blocks; this cannot be disabled).
    * @param inMemory whether blocks should be flagged as in-memory
    * @param cacheDataOnWrite whether data blocks should be cached on write
    * @param cacheIndexesOnWrite whether index blocks should be cached on write
@@ -296,7 +295,9 @@ public class CacheConfig {
    * @param cacheDataCompressed whether to store blocks as compressed in the cache
    * @param prefetchOnOpen whether to prefetch blocks upon open
    * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families
-   * data blocks up in the L1 tier.
+   *          data blocks up in the L1 tier.
+   * @param dropBehindCompaction indicate that we should set drop behind to true when open a store
+   *          file reader for compaction
    */
   CacheConfig(final BlockCache blockCache,
       final boolean cacheDataOnRead, final boolean inMemory,

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index c5b334a..0887ee8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -36,6 +36,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -462,8 +463,6 @@ public class HFile {
 
     boolean isPrimaryReplicaReader();
 
-    void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
-
     boolean shouldIncludeMemstoreTS();
 
     boolean isDecodeMemstoreTS();
@@ -486,33 +485,32 @@ public class HFile {
    * @param size max size of the trailer.
    * @param cacheConf Cache configuation values, cannot be null.
    * @param hfs
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @return an appropriate instance of HFileReader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
-      long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
+  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, long size,
+      CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
+      throws IOException {
     FixedFileTrailer trailer = null;
     try {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
       assert !isHBaseChecksum; // Initially we must read with FS checksum.
       trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
       switch (trailer.getMajorVersion()) {
-      case 2:
-        LOG.debug("Opening HFile v2 with v3 reader");
-        // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
-      case 3 :
-        return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
-      default:
-        throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
+        case 2:
+          LOG.debug("Opening HFile v2 with v3 reader");
+          // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
+        case 3:
+          return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs,
+              primaryReplicaReader, conf);
+        default:
+          throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
       }
     } catch (Throwable t) {
-      try {
-        fsdis.close();
-      } catch (Throwable t2) {
-        LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
-      }
+      IOUtils.closeQuietly(fsdis);
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
     }
   }
@@ -523,13 +521,13 @@ public class HFile {
    * @param fsdis a stream of path's file
    * @param size max size of the trailer.
    * @param cacheConf Cache configuration for hfile's contents
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @param conf Configuration
    * @return A version specific Hfile Reader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
-  @SuppressWarnings("resource")
-  public static Reader createReader(FileSystem fs, Path path,
-      FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
+  public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis,
+      long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf)
       throws IOException {
     HFileSystem hfs = null;
 
@@ -540,9 +538,9 @@ public class HFile {
     if (!(fs instanceof HFileSystem)) {
       hfs = new HFileSystem(fs);
     } else {
-      hfs = (HFileSystem)fs;
+      hfs = (HFileSystem) fs;
     }
-    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
   }
 
   /**
@@ -553,35 +551,39 @@ public class HFile {
   * @throws IOException Will throw a CorruptHFileException
   * (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
   */
- public static Reader createReader(
-     FileSystem fs, Path path,  Configuration conf) throws IOException {
-     return createReader(fs, path, CacheConfig.DISABLED, conf);
- }
+  public static Reader createReader(FileSystem fs, Path path, Configuration conf)
+      throws IOException {
+    // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use
+    // block cache then it is OK to set it as any value. We use true here.
+    return createReader(fs, path, CacheConfig.DISABLED, true, conf);
+  }
 
   /**
-   *
    * @param fs filesystem
    * @param path Path to file to read
-   * @param cacheConf This must not be null.  @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+   * @param cacheConf This must not be null. @see
+   *          {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @return an active Reader instance
-   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
+   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
+   *           is corrupt/invalid.
    */
-  public static Reader createReader(
-      FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
+  public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf,
+      boolean primaryReplicaReader, Configuration conf) throws IOException {
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
-      cacheConf, stream.getHfs(), conf);
+    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
+      stream.getHfs(), primaryReplicaReader, conf);
   }
 
   /**
    * This factory method is used only by unit tests
    */
-  static Reader createReaderFromStream(Path path,
-      FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
-      throws IOException {
+  @VisibleForTesting
+  static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
+      CacheConfig cacheConf, Configuration conf) throws IOException {
     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+    return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf);
   }
 
   /**
@@ -606,22 +608,13 @@ public class HFile {
       throws IOException {
     final Path path = fileStatus.getPath();
     final long size = fileStatus.getLen();
-    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
-    try {
+    try (FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path)) {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
       assert !isHBaseChecksum; // Initially we must read with FS checksum.
       FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
       return true;
     } catch (IllegalArgumentException e) {
       return false;
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      try {
-        fsdis.close();
-      } catch (Throwable t) {
-        LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 030a25e..43b5c24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -306,7 +306,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
       return -2;
     }
 
-    HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), getConf());
+    HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf());
 
     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4e8cbaa..f0a1fa1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -85,7 +85,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   /** Filled when we read in the trailer. */
   private final Compression.Algorithm compressAlgo;
 
-  private boolean isPrimaryReplicaReader;
+  private final boolean primaryReplicaReader;
 
   /**
    * What kind of data block encoding should be used while reading, writing,
@@ -156,6 +156,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   /** Minor versions starting with this number have faked index key */
   static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
 
+  @VisibleForTesting
+  @Deprecated
+  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+      long fileSize, CacheConfig cacheConf, HFileSystem hfs, Configuration conf)
+      throws IOException {
+    this(path, trailer, fsdis, fileSize, cacheConf, hfs, true, conf);
+  }
+
   /**
    * Opens a HFile. You must load the index before you can use it by calling
    * {@link #loadFileInfo()}.
@@ -175,11 +183,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    *          Configuration
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-  public HFileReaderImpl(final Path path, FixedFileTrailer trailer,
-      final FSDataInputStreamWrapper fsdis,
-      final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
-      final Configuration conf)
-  throws IOException {
+  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+      long fileSize, CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader,
+      Configuration conf) throws IOException {
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
     this.cacheConf = cacheConf;
@@ -187,6 +193,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     this.path = path;
     this.name = path.getName();
     this.conf = conf;
+    this.primaryReplicaReader = primaryReplicaReader;
     checkFileVersion();
     this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
     this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
@@ -453,12 +460,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
   @Override
   public boolean isPrimaryReplicaReader() {
-    return isPrimaryReplicaReader;
-  }
-
-  @Override
-  public void setPrimaryReplicaReader(boolean isPrimaryReplicaReader) {
-    this.isPrimaryReplicaReader = isPrimaryReplicaReader;
+    return primaryReplicaReader;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 19daeed..3af4290 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static java.lang.String.format;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -27,7 +32,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,9 +67,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClientServiceCallable;
@@ -99,10 +100,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
  */
@@ -937,8 +934,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     }
     HFile.Reader hfr = null;
     try {
-      hfr = HFile.createReader(fs, hfilePath,
-          new CacheConfig(getConf()), getConf());
+      hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf());
     } catch (FileNotFoundException fnfe) {
       LOG.debug("encountered", fnfe);
       return new Pair<>(null, hfilePath.getName());
@@ -1105,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     HalfStoreFileReader halfReader = null;
     StoreFileWriter halfWriter = null;
     try {
-      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
+      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+          new AtomicInteger(0), true, conf);
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -1213,30 +1210,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
           throws IOException {
         Path hfile = hfileStatus.getPath();
-        HFile.Reader reader = HFile.createReader(fs, hfile,
-            new CacheConfig(getConf()), getConf());
-        try {
+        try (HFile.Reader reader =
+            HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
             hcd.setCompressionType(reader.getFileContext().getCompression());
-            LOG.info("Setting compression " + hcd.getCompressionType().name() +
-                     " for family " + hcd.toString());
+            LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " +
+                hcd.toString());
           }
           reader.loadFileInfo();
           byte[] first = reader.getFirstRowKey();
-          byte[] last  = reader.getLastRowKey();
+          byte[] last = reader.getLastRowKey();
 
-          LOG.info("Trying to figure out region boundaries hfile=" + hfile +
-            " first=" + Bytes.toStringBinary(first) +
-            " last="  + Bytes.toStringBinary(last));
+          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+              Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
 
           // To eventually infer start key-end key boundaries
-          Integer value = map.containsKey(first)? map.get(first):0;
-          map.put(first, value+1);
+          Integer value = map.containsKey(first) ? map.get(first) : 0;
+          map.put(first, value + 1);
 
-          value = map.containsKey(last)? map.get(last):0;
-          map.put(last, value-1);
-        } finally {
-          reader.close();
+          value = map.containsKey(last) ? map.get(last) : 0;
+          map.put(last, value - 1);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
index 366378a..3600fe0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
@@ -264,7 +264,7 @@ public class MergeTableRegionsProcedure
 
   @Override
   protected MergeTableRegionsState getState(final int stateId) {
-    return MergeTableRegionsState.valueOf(stateId);
+    return MergeTableRegionsState.forNumber(stateId);
   }
 
   @Override
@@ -613,11 +613,8 @@ public class MergeTableRegionsProcedure
         final CacheConfig cacheConf = new CacheConfig(conf, hcd);
         for (StoreFileInfo storeFileInfo: storeFiles) {
           // Create reference file(s) of the region in mergedDir
-          regionFs.mergeStoreFile(
-            mergedRegionInfo,
-            family,
-            new StoreFile(
-              mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()),
+          regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
+              storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
             mergedDir);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
index 3cd6c66..bf9afd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
@@ -285,7 +285,7 @@ public class SplitTableRegionProcedure
 
   @Override
   protected SplitTableRegionState getState(final int stateId) {
-    return SplitTableRegionState.valueOf(stateId);
+    return SplitTableRegionState.forNumber(stateId);
   }
 
   @Override
@@ -571,9 +571,9 @@ public class SplitTableRegionProcedure
       if (storeFiles != null && storeFiles.size() > 0) {
         final CacheConfig cacheConf = new CacheConfig(conf, hcd);
         for (StoreFileInfo storeFileInfo: storeFiles) {
-          StoreFileSplitter sfs = new StoreFileSplitter(regionFs, family.getBytes(),
-            new StoreFile(mfs.getFileSystem(), storeFileInfo, conf,
-              cacheConf, hcd.getBloomFilterType()));
+          StoreFileSplitter sfs =
+              new StoreFileSplitter(regionFs, family.getBytes(), new StoreFile(mfs.getFileSystem(),
+                  storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true));
           futures.add(threadPool.submit(sfs));
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
index 7c4d6fe..90d1f2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -44,7 +44,9 @@ public class CachedMobFile extends MobFile implements Comparable<CachedMobFile>
 
   public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
       CacheConfig cacheConf) throws IOException {
-    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+    // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
     return new CachedMobFile(sf);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
index cd4c079..73355e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -118,9 +118,7 @@ public class MobFile {
    * @throws IOException
    */
   public void open() throws IOException {
-    if (sf.getReader() == null) {
-      sf.createReader();
-    }
+    sf.initReader();
   }
 
   /**
@@ -146,7 +144,9 @@ public class MobFile {
    */
   public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
       throws IOException {
-    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+    // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
     return new MobFile(sf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index eb75120..06c5001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -333,7 +333,8 @@ public final class MobUtils {
           if (LOG.isDebugEnabled()) {
             LOG.debug(fileName + " is an expired file");
           }
-          filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+          filesToClean
+              .add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
         }
       } catch (Exception e) {
         LOG.error("Cannot parse the fileName " + fileName, e);
@@ -372,7 +373,7 @@ public final class MobUtils {
     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
     Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
     FileSystem fs = mobRootDir.getFileSystem(conf);
-    return mobRootDir.makeQualified(fs);
+    return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
@@ -697,7 +698,7 @@ public final class MobUtils {
       return null;
     }
     Path dstPath = new Path(targetPath, sourceFile.getName());
-    validateMobFile(conf, fs, sourceFile, cacheConfig);
+    validateMobFile(conf, fs, sourceFile, cacheConfig, true);
     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
     LOG.info(msg);
     Path parent = dstPath.getParent();
@@ -718,11 +719,11 @@ public final class MobUtils {
    * @param cacheConfig The current cache config.
    */
   private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
-      CacheConfig cacheConfig) throws IOException {
+      CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
-      storeFile.createReader();
+      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
+      storeFile.initReader();
     } catch (IOException e) {
       LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
       throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 987fe51..05c7076 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -223,12 +223,9 @@ public class PartitionedMobCompactor extends MobCompactor {
         // File in the Del Partition List
 
         // Get delId from the file
-        Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
-        try {
+        try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
           delId.setStartKey(reader.getFirstRowKey());
           delId.setEndKey(reader.getLastRowKey());
-        } finally {
-          reader.close();
         }
         CompactionDelPartition delPartition = delFilesToCompact.get(delId);
         if (delPartition == null) {
@@ -267,12 +264,9 @@ public class PartitionedMobCompactor extends MobCompactor {
           if (withDelFiles) {
             // get startKey and endKey from the file and update partition
             // TODO: is it possible to skip read of most hfiles?
-            Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
-            try {
+            try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
               compactionPartition.setStartKey(reader.getFirstRowKey());
               compactionPartition.setEndKey(reader.getLastRowKey());
-            } finally {
-              reader.close();
             }
           }
 
@@ -340,10 +334,11 @@ public class PartitionedMobCompactor extends MobCompactor {
     try {
       for (CompactionDelPartition delPartition : request.getDelPartitions()) {
         for (Path newDelPath : delPartition.listDelFiles()) {
-          StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+          StoreFile sf =
+              new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true);
           // pre-create reader of a del file to avoid race condition when opening the reader in each
           // partition.
-          sf.createReader();
+          sf.initReader();
           delPartition.addStoreFile(sf);
           totalDelFileCount++;
         }
@@ -557,7 +552,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       List<StoreFile> filesToCompact = new ArrayList<>();
       for (int i = offset; i < batch + offset; i++) {
         StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
-          BloomType.NONE);
+            BloomType.NONE, true);
         filesToCompact.add(sf);
       }
       filesToCompact.addAll(delFiles);
@@ -739,7 +734,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       }
       for (int i = offset; i < batch + offset; i++) {
         batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
-          BloomType.NONE));
+          BloomType.NONE, true));
       }
       // compact the del files in a batch.
       paths.add(compactDelFilesInBatch(request, batchedDelFiles));
@@ -809,8 +804,8 @@ public class PartitionedMobCompactor extends MobCompactor {
    */
   private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
     throws IOException {
-    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
-      false, HConstants.LATEST_TIMESTAMP);
+    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+      false, true, false, false, HConstants.LATEST_TIMESTAMP);
     Scan scan = new Scan();
     scan.setMaxVersions(column.getMaxVersions());
     long ttl = HStore.determineTTLFromFamily(column);
@@ -893,7 +888,8 @@ public class PartitionedMobCompactor extends MobCompactor {
     for (StoreFile sf : storeFiles) {
       // the readers will be closed later after the merge.
       maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
-      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+      sf.initReader();
+      byte[] count = sf.getReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
       if (count != null) {
         maxKeyCount += Bytes.toLong(count);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index c37ae99..da25df5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -135,7 +135,7 @@ class DefaultStoreFileManager implements StoreFileManager {
     this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
   }
 
-  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
   // Let a background thread close the actual reader on these compacted files and also
   // ensure to evict the blocks from block cache so that they are no longer in
   // cache

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index b021430..032e383 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -292,9 +292,9 @@ public class HMobStore extends HStore {
   private void validateMobFile(Path path) throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile =
-          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
-      storeFile.createReader();
+      storeFile = new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig,
+          BloomType.NONE, isPrimaryReplicaStore());
+      storeFile.initReader();
     } catch (IOException e) {
       LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
       throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 78ce608..b21a84d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4160,8 +4160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
       Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size());
       for (Path file: files) {
-        fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
-          null, null));
+        fakeStoreFiles.add(
+          new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
       }
       getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 144f43b..014427d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -294,7 +294,7 @@ public class HRegionFileSystem {
    */
   Path getStoreFilePath(final String familyName, final String fileName) {
     Path familyDir = getStoreDir(familyName);
-    return new Path(familyDir, fileName).makeQualified(this.fs);
+    return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
@@ -675,9 +675,7 @@ public class HRegionFileSystem {
     if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
       // Check whether the split row lies in the range of the store file
       // If it is outside the range, return directly.
-      if (f.getReader() == null) {
-        f.createReader();
-      }
+      f.initReader();
       try {
         if (top) {
           //check if larger than last key.

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a98f89e..cbdaa1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -650,13 +650,11 @@ public class HStore implements Store {
     return createStoreFileAndReader(info);
   }
 
-  private StoreFile createStoreFileAndReader(final StoreFileInfo info)
-      throws IOException {
+  private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException {
     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
-      this.family.getBloomFilterType());
-    StoreFileReader r = storeFile.createReader();
-    r.setReplicaStoreFile(isPrimaryReplicaStore());
+        this.family.getBloomFilterType(), isPrimaryReplicaStore());
+    storeFile.initReader();
     return storeFile;
   }
 
@@ -726,8 +724,8 @@ public class HStore implements Store {
     try {
       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
-      reader = HFile.createReader(srcPath.getFileSystem(conf),
-          srcPath, cacheConf, conf);
+      reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf,
+        isPrimaryReplicaStore(), conf);
       reader.loadFileInfo();
 
       byte[] firstKey = reader.getFirstRowKey();
@@ -1180,7 +1178,7 @@ public class HStore implements Store {
     // but now we get them in ascending order, which I think is
     // actually more correct, since memstore get put at the end.
     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
-      cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+      cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
     scanners.addAll(sfScanners);
     // Then the memstore scanners
@@ -1203,7 +1201,7 @@ public class HStore implements Store {
       }
     }
     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
-      cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+      cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
     scanners.addAll(sfScanners);
     // Then the memstore scanners
@@ -2456,8 +2454,9 @@ public class HStore implements Store {
               LOG.debug("The file " + file + " was closed but still not archived.");
             }
             filesToRemove.add(file);
+            continue;
           }
-          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+          if (file.isCompactedAway() && !file.isReferencedInReads()) {
             // Even if deleting fails we need not bother as any new scanners won't be
             // able to use the compacted file as the status is already compactedAway
             if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 41c13f5..d71af2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -54,7 +54,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
 
   /** Constructor for testing. */
   ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners)
+      final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners)
       throws IOException {
     super(scan, scanInfo, scanType, columns, scanners,
         HConstants.LATEST_TIMESTAMP);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 7aef05e..c53fbf08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * and append data. Be sure to add any metadata before calling close on the
  * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
  * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
- * passing filesystem and path.  To read, call {@link #createReader()}.
+ * passing filesystem and path.  To read, call {@link #initReader()}
  * <p>StoreFiles may also reference store files in another Store.
  *
  * The reason for this weird pattern where you use a different instance for the
@@ -64,6 +65,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 public class StoreFile {
   private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
+  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
+
+  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
+
   // Keys for fileinfo values in HFile
 
   /** Max Sequence ID in FileInfo */
@@ -103,6 +108,18 @@ public class StoreFile {
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
+  // Counter that is incremented every time a scanner is created on the
+  // store file. It is decremented when the scan on the store file is
+  // done.
+  private final AtomicInteger refCount = new AtomicInteger(0);
+
+  private final boolean noReadahead;
+
+  private final boolean primaryReplica;
+
+  // Indicates if the file got compacted
+  private volatile boolean compactedAway = false;
+
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
@@ -116,7 +133,7 @@ public class StoreFile {
 
   private Cell lastKey;
 
-  private Comparator comparator;
+  private Comparator<Cell> comparator;
 
   CacheConfig getCacheConf() {
     return cacheConf;
@@ -130,7 +147,7 @@ public class StoreFile {
     return lastKey;
   }
 
-  public Comparator getComparator() {
+  public Comparator<Cell> getComparator() {
     return comparator;
   }
 
@@ -179,72 +196,96 @@ public class StoreFile {
   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a
-   * substantial amount of ram depending on the underlying files (10-20MB?).
-   *
-   * @param fs  The current file system to use.
-   * @param p  The path of the file.
-   * @param conf  The current configuration.
-   * @param cacheConf  The cache configuration and block cache reference.
-   * @param cfBloomType The bloom type to use for this store file as specified
-   *          by column family configuration. This may or may not be the same
-   *          as the Bloom filter type actually present in the HFile, because
-   *          column family configuration might change. If this is
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param p The path of the file.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
-   * @throws IOException When opening the reader fails.
+   * @deprecated Now we will specific whether the StoreFile is for primary replica when
+   *             constructing, so please use
+   *             {@link #StoreFile(FileSystem, Path, Configuration, CacheConfig, BloomType, boolean)}
+   *             directly.
    */
+  @Deprecated
   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
-        final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+      final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
   }
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a
-   * substantial amount of ram depending on the underlying files (10-20MB?).
-   *
-   * @param fs  The current file system to use.
-   * @param fileInfo  The store file information.
-   * @param conf  The current configuration.
-   * @param cacheConf  The cache configuration and block cache reference.
-   * @param cfBloomType The bloom type to use for this store file as specified
-   *          by column family configuration. This may or may not be the same
-   *          as the Bloom filter type actually present in the HFile, because
-   *          column family configuration might change. If this is
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param p The path of the file.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
+   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+   * @throws IOException
+   */
+  public StoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
+      BloomType cfBloomType, boolean primaryReplica) throws IOException {
+    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
+  }
+
+  /**
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param fileInfo The store file information.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
-   * @throws IOException When opening the reader fails.
+   * @deprecated Now we will specific whether the StoreFile is for primary replica when
+   *             constructing, so please use
+   *             {@link #StoreFile(FileSystem, StoreFileInfo, Configuration, CacheConfig, BloomType, boolean)}
+   *             directly.
    */
+  @Deprecated
   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
-      final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
+      final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+    this(fs, fileInfo, conf, cacheConf, cfBloomType, true);
+  }
+
+  /**
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs fs The current file system to use.
+   * @param fileInfo The store file information.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType cfBloomType The bloom type to use for this store file as specified by column
+   *          family configuration. This may or may not be the same as the Bloom filter type
+   *          actually present in the HFile, because column family configuration might change. If
+   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+   */
+  public StoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
+      BloomType cfBloomType, boolean primaryReplica) {
     this.fs = fs;
     this.fileInfo = fileInfo;
     this.cacheConf = cacheConf;
-
+    this.noReadahead =
+        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
       this.cfBloomType = cfBloomType;
     } else {
-      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
-          "cfBloomType=" + cfBloomType + " (disabled in config)");
+      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
+          cfBloomType + " (disabled in config)");
       this.cfBloomType = BloomType.NONE;
     }
-  }
-
-  /**
-   * Clone
-   * @param other The StoreFile to clone from
-   */
-  public StoreFile(final StoreFile other) {
-    this.fs = other.fs;
-    this.fileInfo = other.fileInfo;
-    this.cacheConf = other.cacheConf;
-    this.cfBloomType = other.cfBloomType;
-    this.metadataMap = other.metadataMap;
-  }
-
-  /**
-   * Clone a StoreFile for opening private reader.
-   */
-  public StoreFile cloneForReader() {
-    return new StoreFile(this);
+    this.primaryReplica = primaryReplica;
   }
 
   /**
@@ -266,12 +307,12 @@ public class StoreFile {
    * @return Returns the qualified path of this StoreFile
    */
   public Path getQualifiedPath() {
-    return this.fileInfo.getPath().makeQualified(fs);
+    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
    * @return True if this is a StoreFile Reference; call
-   * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
+   * after {@link #open()} else may get wrong answer.
    */
   public boolean isReference() {
     return this.fileInfo.isReference();
@@ -376,15 +417,21 @@ public class StoreFile {
 
   @VisibleForTesting
   public boolean isCompactedAway() {
-    if (this.reader != null) {
-      return this.reader.isCompactedAway();
-    }
-    return true;
+    return compactedAway;
   }
 
   @VisibleForTesting
   public int getRefCount() {
-    return this.reader.getRefCount().get();
+    return refCount.get();
+  }
+
+  /**
+   * @return true if the file is still used in reads
+   */
+  public boolean isReferencedInReads() {
+    int rc = refCount.get();
+    assert rc >= 0; // we should not go negative.
+    return rc > 0;
   }
 
   /**
@@ -404,18 +451,18 @@ public class StoreFile {
   }
 
   /**
-   * Opens reader on this store file.  Called by Constructor.
-   * @return Reader for the store file.
+   * Opens reader on this store file. Called by Constructor.
    * @throws IOException
    * @see #closeReader(boolean)
    */
-  private StoreFileReader open(boolean canUseDropBehind) throws IOException {
+  private void open() throws IOException {
     if (this.reader != null) {
       throw new IllegalAccessError("Already open");
     }
 
     // Open the StoreFile.Reader
-    this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
+    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
+      primaryReplica, refCount, true);
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -513,38 +560,45 @@ public class StoreFile {
     firstKey = reader.getFirstKey();
     lastKey = reader.getLastKey();
     comparator = reader.getComparator();
-    return this.reader;
-  }
-
-  public StoreFileReader createReader() throws IOException {
-    return createReader(false);
   }
 
   /**
-   * @return Reader for StoreFile. creates if necessary
-   * @throws IOException
+   * Initialize the reader used for pread.
    */
-  public StoreFileReader createReader(boolean canUseDropBehind) throws IOException {
-    if (this.reader == null) {
+  public void initReader() throws IOException {
+    if (reader == null) {
       try {
-        this.reader = open(canUseDropBehind);
-      } catch (IOException e) {
+        open();
+      } catch (Exception e) {
         try {
-          boolean evictOnClose =
-              cacheConf != null? cacheConf.shouldEvictOnClose(): true;
+          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
           this.closeReader(evictOnClose);
         } catch (IOException ee) {
+          LOG.warn("failed to close reader", ee);
         }
         throw e;
       }
-
     }
-    return this.reader;
+  }
+
+  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
+    initReader();
+    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
+      primaryReplica, refCount, false);
+    reader.copyFields(this.reader);
+    return reader;
+  }
+
+  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
+      boolean pread, boolean isCompaction, long readPt, long scannerOrder,
+      boolean canOptimizeForNonNullColumn) throws IOException {
+    return createStreamReader(canUseDropBehind).getStoreFileScanner(
+      cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**
-   * @return Current reader.  Must call createReader first else returns null.
-   * @see #createReader()
+   * @return Current reader.  Must call initReader first else returns null.
+   * @see #initReader()
    */
   public StoreFileReader getReader() {
     return this.reader;
@@ -566,9 +620,7 @@ public class StoreFile {
    * Marks the status of the file as compactedAway.
    */
   public void markCompactedAway() {
-    if (this.reader != null) {
-      this.reader.markCompactedAway();
-    }
+    this.compactedAway = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 3c12045..c4754a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -233,25 +234,24 @@ public class StoreFileInfo {
    * @param cacheConf The cache configuration and block cache reference.
    * @return The StoreFile.Reader for the file
    */
-  public StoreFileReader open(final FileSystem fs,
-      final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
+  public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind,
+      long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared)
+      throws IOException {
     FSDataInputStreamWrapper in;
     FileStatus status;
 
     final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
     if (this.link != null) {
       // HFileLink
-      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead);
       status = this.link.getFileStatus(fs);
     } else if (this.reference != null) {
       // HFile Reference
       Path referencePath = getReferredToFile(this.getPath());
-      in = new FSDataInputStreamWrapper(fs, referencePath,
-          doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead);
       status = fs.getFileStatus(referencePath);
     } else {
-      in = new FSDataInputStreamWrapper(fs, this.getPath(),
-          doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead);
       status = fs.getFileStatus(initialPath);
     }
     long length = status.getLen();
@@ -265,9 +265,10 @@ public class StoreFileInfo {
     if (reader == null) {
       if (this.reference != null) {
         reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
-          conf);
+            isPrimaryReplicaStoreFile, refCount, shared, conf);
       } else {
-        reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf);
+        reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf,
+            isPrimaryReplicaStoreFile, refCount, shared, conf);
       }
     }
     if (this.coprocessorHost != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 8f01a93..b015ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.Map;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -68,36 +69,47 @@ public class StoreFileReader {
   private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
   private boolean skipResetSeqId = true;
 
-  public AtomicInteger getRefCount() {
-    return refCount;
-  }
-
   // Counter that is incremented every time a scanner is created on the
-  // store file.  It is decremented when the scan on the store file is
-  // done.
-  private AtomicInteger refCount = new AtomicInteger(0);
-  // Indicates if the file got compacted
-  private volatile boolean compactedAway = false;
+  // store file. It is decremented when the scan on the store file is
+  // done. All StoreFileReader for the same StoreFile will share this counter.
+  private final AtomicInteger refCount;
 
-  public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
-      throws IOException {
-    reader = HFile.createReader(fs, path, cacheConf, conf);
+  // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
+  // close the internal reader when readCompleted is called.
+  private final boolean shared;
+
+  private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
+    this.reader = reader;
     bloomFilterType = BloomType.NONE;
+    this.refCount = refCount;
+    this.shared = shared;
   }
 
-  void markCompactedAway() {
-    this.compactedAway = true;
+  public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf,
+      boolean primaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
+      throws IOException {
+    this(HFile.createReader(fs, path, cacheConf, primaryReplicaStoreFile, conf), refCount, shared);
   }
 
   public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
-      CacheConfig cacheConf, Configuration conf) throws IOException {
-    reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
-    bloomFilterType = BloomType.NONE;
+      CacheConfig cacheConf, boolean primaryReplicaStoreFile, AtomicInteger refCount,
+      boolean shared, Configuration conf) throws IOException {
+    this(HFile.createReader(fs, path, in, size, cacheConf, primaryReplicaStoreFile, conf), refCount,
+        shared);
   }
 
-  public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
-    reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
+  void copyFields(StoreFileReader reader) {
+    this.generalBloomFilter = reader.generalBloomFilter;
+    this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter;
+    this.bloomFilterType = reader.bloomFilterType;
+    this.sequenceID = reader.sequenceID;
+    this.timeRange = reader.timeRange;
+    this.lastBloomKey = reader.lastBloomKey;
+    this.bulkLoadResult = reader.bulkLoadResult;
+    this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV;
+    this.skipResetSeqId = reader.skipResetSeqId;
   }
+
   public boolean isPrimaryReplicaReader() {
     return reader.isPrimaryReplicaReader();
   }
@@ -105,8 +117,11 @@ public class StoreFileReader {
   /**
    * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
    */
+  @VisibleForTesting
   StoreFileReader() {
+    this.refCount = new AtomicInteger(0);
     this.reader = null;
+    this.shared = false;
   }
 
   public CellComparator getComparator() {
@@ -128,30 +143,23 @@ public class StoreFileReader {
       boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
     // Increment the ref count
     refCount.incrementAndGet();
-    return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction,
-        reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
+    return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction),
+        !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**
-   * Decrement the ref count associated with the reader when ever a scanner associated
-   * with the reader is closed
+   * Indicate that the scanner has finished reading with this reader. We need to decrement the ref
+   * count, and also, if this is not the common pread reader, we should close it.
    */
-  void decrementRefCount() {
+  void readCompleted() {
     refCount.decrementAndGet();
-  }
-
-  /**
-   * @return true if the file is still used in reads
-   */
-  public boolean isReferencedInReads() {
-    return refCount.get() != 0;
-  }
-
-  /**
-   * @return true if the file is compacted
-   */
-  public boolean isCompactedAway() {
-    return this.compactedAway;
+    if (!shared) {
+      try {
+        reader.close(false);
+      } catch (IOException e) {
+        LOG.warn("failed to close stream reader", e);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index ab6b0ef..aa4f897 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -124,26 +122,44 @@ public class StoreFileScanner implements KeyValueScanner {
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
       boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
-      ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
+      ScanQueryMatcher matcher, long readPt) throws IOException {
     List<StoreFileScanner> scanners = new ArrayList<>(files.size());
-    List<StoreFile> sorted_files = new ArrayList<>(files);
-    Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
-    for (int i = 0; i < sorted_files.size(); i++) {
-      StoreFileReader r = sorted_files.get(i).createReader(canUseDrop);
-      r.setReplicaStoreFile(isPrimaryReplica);
-      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt,
-        i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
+    List<StoreFile> sortedFiles = new ArrayList<>(files);
+    Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+    for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+      StoreFile sf = sortedFiles.get(i);
+      sf.initReader();
+      StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
+        isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
       scanners.add(scanner);
     }
     return scanners;
   }
 
-  public static List<StoreFileScanner> getScannersForStoreFiles(
-    Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-    boolean isCompaction, boolean canUseDrop,
-    ScanQueryMatcher matcher, long readPt) throws IOException {
-    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
-      matcher, readPt, true);
+  /**
+   * Get scanners for compaction. We will create a separated reader for each store file to avoid
+   * contention with normal read request.
+   */
+  public static List<StoreFileScanner> getScannersForCompaction(Collection<StoreFile> files,
+      boolean canUseDropBehind, long readPt) throws IOException {
+    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
+    List<StoreFile> sortedFiles = new ArrayList<>(files);
+    Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+    boolean succ = false;
+    try {
+      for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+        scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
+          readPt, i, false));
+      }
+      succ = true;
+    } finally {
+      if (!succ) {
+        for (StoreFileScanner scanner : scanners) {
+          scanner.close();
+        }
+      }
+    }
+    return scanners;
   }
 
   public String toString() {
@@ -262,7 +278,7 @@ public class StoreFileScanner implements KeyValueScanner {
     cur = null;
     this.hfs.close();
     if (this.reader != null) {
-      this.reader.decrementRefCount();
+      this.reader.readCompleted();
     }
     closed = true;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 99ec30e..3bc6a0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -312,7 +312,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   StoreScanner(final Scan scan, ScanInfo scanInfo,
       ScanType scanType, final NavigableSet<byte[]> columns,
-      final List<KeyValueScanner> scanners) throws IOException {
+      final List<? extends KeyValueScanner> scanners) throws IOException {
     this(scan, scanInfo, scanType, columns, scanners,
         HConstants.LATEST_TIMESTAMP,
         // 0 is passed as readpoint because the test bypasses Store
@@ -322,7 +322,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   StoreScanner(final Scan scan, ScanInfo scanInfo,
     ScanType scanType, final NavigableSet<byte[]> columns,
-    final List<KeyValueScanner> scanners, long earliestPutTs)
+    final List<? extends KeyValueScanner> scanners, long earliestPutTs)
         throws IOException {
     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
       // 0 is passed as readpoint because the test bypasses Store
@@ -330,7 +330,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs,
+      final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
       long readPt) throws IOException {
     this(null, scan, scanInfo, columns, readPt,
         scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);