You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/26 22:52:17 UTC

[10/40] hbase git commit: HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction

HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66b616d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66b616d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66b616d7

Branch: refs/heads/hbase-12439
Commit: 66b616d7a3d6f4ad6d20962e2dfc0c82a4092ddb
Parents: 719a30b
Author: zhangduo <zh...@apache.org>
Authored: Mon Apr 17 22:53:49 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Apr 19 09:26:33 2017 +0800

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      |  63 +++---
 .../org/apache/hadoop/hbase/io/FileLink.java    |  14 +-
 .../hadoop/hbase/io/HalfStoreFileReader.java    |  13 +-
 .../hadoop/hbase/io/hfile/CacheConfig.java      |   9 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |  85 ++++----
 .../hbase/io/hfile/HFilePrettyPrinter.java      |   2 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  26 +--
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  45 ++--
 .../procedure/MergeTableRegionsProcedure.java   |   9 +-
 .../procedure/SplitTableRegionProcedure.java    |   8 +-
 .../apache/hadoop/hbase/mob/CachedMobFile.java  |   4 +-
 .../org/apache/hadoop/hbase/mob/MobFile.java    |   8 +-
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  13 +-
 .../compactions/PartitionedMobCompactor.java    |  26 +--
 .../regionserver/DefaultStoreFileManager.java   |   2 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   6 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +-
 .../hbase/regionserver/HRegionFileSystem.java   |   6 +-
 .../hadoop/hbase/regionserver/HStore.java       |  19 +-
 .../regionserver/ReversedStoreScanner.java      |   2 +-
 .../hadoop/hbase/regionserver/StoreFile.java    | 216 ++++++++++++-------
 .../hbase/regionserver/StoreFileInfo.java       |  21 +-
 .../hbase/regionserver/StoreFileReader.java     |  86 ++++----
 .../hbase/regionserver/StoreFileScanner.java    |  50 +++--
 .../hadoop/hbase/regionserver/StoreScanner.java |   6 +-
 .../regionserver/compactions/Compactor.java     |  44 +---
 .../hadoop/hbase/util/CompressionTest.java      |   2 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |   6 +-
 .../hbase/util/hbck/HFileCorruptionChecker.java |   4 +-
 .../hbase/HFilePerformanceEvaluation.java       |   2 +-
 .../hadoop/hbase/client/TestFromClientSide.java |   1 +
 .../hbase/io/TestHalfStoreFileReader.java       | 192 ++++++++---------
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |   2 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |   8 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |   6 +-
 .../hbase/io/hfile/TestHFileEncryption.java     |   6 +-
 .../TestHFileInlineToRootChunkConversion.java   |   2 +-
 .../hadoop/hbase/io/hfile/TestPrefetch.java     |   2 +-
 .../hadoop/hbase/io/hfile/TestReseekTo.java     |   4 +-
 .../hfile/TestSeekBeforeWithInlineBlocks.java   |   2 +-
 .../hadoop/hbase/io/hfile/TestSeekTo.java       |   8 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java |  10 +-
 .../TestImportTSVWithVisibilityLabels.java      |   2 +-
 .../hadoop/hbase/mapreduce/TestImportTsv.java   |   2 +-
 .../mapreduce/TestLoadIncrementalHFiles.java    |   4 +-
 .../apache/hadoop/hbase/mob/TestMobFile.java    |   8 +-
 .../hbase/mob/compactions/TestMobCompactor.java |   9 +-
 .../TestPartitionedMobCompactor.java            |  18 +-
 .../regionserver/DataBlockEncodingTool.java     |   7 +-
 .../EncodedSeekPerformanceTest.java             |  12 +-
 .../hbase/regionserver/MockStoreFile.java       |  25 ++-
 .../regionserver/TestCacheOnWriteInSchema.java  |   6 +-
 .../regionserver/TestCompactionPolicy.java      |   3 -
 .../regionserver/TestCompoundBloomFilter.java   |   7 +-
 .../regionserver/TestEncryptionKeyRotation.java |   2 +-
 .../TestEncryptionRandomKeying.java             |   2 +-
 .../hbase/regionserver/TestFSErrorsExposed.java |  12 +-
 .../regionserver/TestMobStoreCompaction.java    |   7 +-
 .../regionserver/TestReversibleScanners.java    |  33 ++-
 .../hadoop/hbase/regionserver/TestStore.java    |   2 +-
 .../hbase/regionserver/TestStoreFile.java       | 120 ++++++-----
 .../TestStoreFileScannerWithTagCompression.java |  10 +-
 .../regionserver/compactions/TestCompactor.java |   3 -
 .../compactions/TestStripeCompactionPolicy.java |   3 -
 .../hbase/util/TestHBaseFsckEncryption.java     |   2 +-
 .../hadoop/hbase/spark/BulkLoadSuite.scala      |   8 +-
 66 files changed, 701 insertions(+), 650 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..055e46a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,11 +34,14 @@ import com.google.common.annotations.VisibleForTesting;
  * as well as closing streams. Initialization is not thread-safe, but normal operation is;
  * see method comments.
  */
-public class FSDataInputStreamWrapper {
+@InterfaceAudience.Private
+public class FSDataInputStreamWrapper implements Closeable {
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
   private final boolean doCloseStreams;
+  private final boolean dropBehind;
+  private final long readahead;
 
   /** Two stream handles, one with and one without FS-level checksum.
    * HDFS checksum setting is on FS level, not single read level, so you have to keep two
@@ -75,43 +81,52 @@ public class FSDataInputStreamWrapper {
   private volatile int hbaseChecksumOffCount = -1;
 
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
-    this(fs, null, path, false);
+    this(fs, path, false, -1L);
   }
 
-  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
-    this(fs, null, path, dropBehind);
+  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
+    this(fs, null, path, dropBehind, readahead);
   }
 
-  public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
-    this(fs, link, null, false);
-  }
   public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
-                                  boolean dropBehind) throws IOException {
-    this(fs, link, null, dropBehind);
+                                  boolean dropBehind, long readahead) throws IOException {
+    this(fs, link, null, dropBehind, readahead);
   }
 
-  private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
-                                   Path path, boolean dropBehind) throws IOException {
+  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
+      long readahead) throws IOException {
     assert (path == null) != (link == null);
     this.path = path;
     this.link = link;
     this.doCloseStreams = true;
+    this.dropBehind = dropBehind;
+    this.readahead = readahead;
     // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
     // that wraps over the specified fs. In this case, we will not be able to avoid
     // checksumming inside the filesystem.
-    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
+    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
 
     // Initially we are going to read the tail block. Open the reader w/FS checksum.
     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+    setStreamOptions(stream);
+  }
+
+  private void setStreamOptions(FSDataInputStream in) {
     try {
       this.stream.setDropBehind(dropBehind);
     } catch (Exception e) {
       // Skipped.
     }
+    if (readahead >= 0) {
+      try {
+        this.stream.setReadahead(readahead);
+      } catch (Exception e) {
+        // Skipped.
+      }
+    }
   }
 
-
   /**
    * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
    * reads finish and before any other reads start (what happens in reality is we read the
@@ -127,6 +142,7 @@ public class FSDataInputStreamWrapper {
     if (useHBaseChecksum) {
       FileSystem fsNc = hfs.getNoChecksumFs();
       this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
+      setStreamOptions(streamNoFsChecksum);
       this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
       // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
       this.stream.close();
@@ -150,6 +166,8 @@ public class FSDataInputStreamWrapper {
     link = null;
     hfs = null;
     useHBaseChecksumConfigured = useHBaseChecksum = false;
+    dropBehind = false;
+    readahead = 0;
   }
 
   /**
@@ -201,19 +219,14 @@ public class FSDataInputStreamWrapper {
   }
 
   /** Close stream(s) if necessary. */
-  public void close() throws IOException {
-    if (!doCloseStreams) return;
-    try {
-      if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
-        streamNoFsChecksum.close();
-        streamNoFsChecksum = null;
-      }
-    } finally {
-      if (stream != null) {
-        stream.close();
-        stream = null;
-      }
+  @Override
+  public void close() {
+    if (!doCloseStreams) {
+      return;
     }
+    // we do not care about the close exception as it is for reading, no data loss issue.
+    IOUtils.closeQuietly(streamNoFsChecksum);
+    IOUtils.closeQuietly(stream);
   }
 
   public HFileSystem getHfs() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index ca0dfbc..8a79efb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -29,6 +29,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
@@ -99,7 +101,7 @@ public class FileLink {
    * and the alternative locations, when the file is moved.
    */
   private static class FileLinkInputStream extends InputStream
-      implements Seekable, PositionedReadable {
+      implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead {
     private FSDataInputStream in = null;
     private Path currentPath = null;
     private long pos = 0;
@@ -306,6 +308,16 @@ public class FileLink {
       }
       throw new FileNotFoundException("Unable to open link: " + fileLink);
     }
+
+    @Override
+    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+      in.setReadahead(readahead);
+    }
+
+    @Override
+    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
+      in.setDropBehind(dropCache);
+    }
   }
 
   private Path[] locations = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index a4a281e..c4dbc39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,10 +73,10 @@ public class HalfStoreFileReader extends StoreFileReader {
    * @param conf Configuration
    * @throws IOException
    */
-  public HalfStoreFileReader(final FileSystem fs, final Path p,
-      final CacheConfig cacheConf, final Reference r, final Configuration conf)
+  public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r,
+      boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
       throws IOException {
-    super(fs, p, cacheConf, conf);
+    super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't
@@ -99,9 +100,9 @@ public class HalfStoreFileReader extends StoreFileReader {
    * @throws IOException
    */
   public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in,
-      long size, final CacheConfig cacheConf,  final Reference r, final Configuration conf)
-      throws IOException {
-    super(fs, p, in, size, cacheConf, conf);
+      long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile,
+      AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException {
+    super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4db60b5..791445b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -283,11 +283,10 @@ public class CacheConfig {
   }
 
   /**
-   * Create a block cache configuration with the specified cache and
-   * configuration parameters.
+   * Create a block cache configuration with the specified cache and configuration parameters.
    * @param blockCache reference to block cache, null if completely disabled
    * @param cacheDataOnRead whether DATA blocks should be cached on read (we always cache INDEX
-   * blocks and BLOOM blocks; this cannot be disabled).
+   *          blocks and BLOOM blocks; this cannot be disabled).
    * @param inMemory whether blocks should be flagged as in-memory
    * @param cacheDataOnWrite whether data blocks should be cached on write
    * @param cacheIndexesOnWrite whether index blocks should be cached on write
@@ -296,7 +295,9 @@ public class CacheConfig {
    * @param cacheDataCompressed whether to store blocks as compressed in the cache
    * @param prefetchOnOpen whether to prefetch blocks upon open
    * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families
-   * data blocks up in the L1 tier.
+   *          data blocks up in the L1 tier.
+   * @param dropBehindCompaction indicate that we should set drop behind to true when open a store
+   *          file reader for compaction
    */
   CacheConfig(final BlockCache blockCache,
       final boolean cacheDataOnRead, final boolean inMemory,

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index c5b334a..0887ee8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -36,6 +36,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -462,8 +463,6 @@ public class HFile {
 
     boolean isPrimaryReplicaReader();
 
-    void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
-
     boolean shouldIncludeMemstoreTS();
 
     boolean isDecodeMemstoreTS();
@@ -486,33 +485,32 @@ public class HFile {
    * @param size max size of the trailer.
    * @param cacheConf Cache configuation values, cannot be null.
    * @param hfs
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @return an appropriate instance of HFileReader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
-      long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
+  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, long size,
+      CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
+      throws IOException {
     FixedFileTrailer trailer = null;
     try {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
       assert !isHBaseChecksum; // Initially we must read with FS checksum.
       trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
       switch (trailer.getMajorVersion()) {
-      case 2:
-        LOG.debug("Opening HFile v2 with v3 reader");
-        // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
-      case 3 :
-        return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
-      default:
-        throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
+        case 2:
+          LOG.debug("Opening HFile v2 with v3 reader");
+          // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
+        case 3:
+          return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs,
+              primaryReplicaReader, conf);
+        default:
+          throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
       }
     } catch (Throwable t) {
-      try {
-        fsdis.close();
-      } catch (Throwable t2) {
-        LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
-      }
+      IOUtils.closeQuietly(fsdis);
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
     }
   }
@@ -523,13 +521,13 @@ public class HFile {
    * @param fsdis a stream of path's file
    * @param size max size of the trailer.
    * @param cacheConf Cache configuration for hfile's contents
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @param conf Configuration
    * @return A version specific Hfile Reader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
-  @SuppressWarnings("resource")
-  public static Reader createReader(FileSystem fs, Path path,
-      FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
+  public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis,
+      long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf)
       throws IOException {
     HFileSystem hfs = null;
 
@@ -540,9 +538,9 @@ public class HFile {
     if (!(fs instanceof HFileSystem)) {
       hfs = new HFileSystem(fs);
     } else {
-      hfs = (HFileSystem)fs;
+      hfs = (HFileSystem) fs;
     }
-    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
   }
 
   /**
@@ -553,35 +551,39 @@ public class HFile {
   * @throws IOException Will throw a CorruptHFileException
   * (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
   */
- public static Reader createReader(
-     FileSystem fs, Path path,  Configuration conf) throws IOException {
-     return createReader(fs, path, CacheConfig.DISABLED, conf);
- }
+  public static Reader createReader(FileSystem fs, Path path, Configuration conf)
+      throws IOException {
+    // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use
+    // block cache then it is OK to set it as any value. We use true here.
+    return createReader(fs, path, CacheConfig.DISABLED, true, conf);
+  }
 
   /**
-   *
    * @param fs filesystem
    * @param path Path to file to read
-   * @param cacheConf This must not be null.  @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+   * @param cacheConf This must not be null. @see
+   *          {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
+   * @param primaryReplicaReader true if this is a reader for primary replica
    * @return an active Reader instance
-   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
+   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
+   *           is corrupt/invalid.
    */
-  public static Reader createReader(
-      FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
+  public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf,
+      boolean primaryReplicaReader, Configuration conf) throws IOException {
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
-      cacheConf, stream.getHfs(), conf);
+    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
+      stream.getHfs(), primaryReplicaReader, conf);
   }
 
   /**
    * This factory method is used only by unit tests
    */
-  static Reader createReaderFromStream(Path path,
-      FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
-      throws IOException {
+  @VisibleForTesting
+  static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
+      CacheConfig cacheConf, Configuration conf) throws IOException {
     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+    return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf);
   }
 
   /**
@@ -606,22 +608,13 @@ public class HFile {
       throws IOException {
     final Path path = fileStatus.getPath();
     final long size = fileStatus.getLen();
-    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
-    try {
+    try (FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path)) {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
       assert !isHBaseChecksum; // Initially we must read with FS checksum.
       FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
       return true;
     } catch (IllegalArgumentException e) {
       return false;
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      try {
-        fsdis.close();
-      } catch (Throwable t) {
-        LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 030a25e..43b5c24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -306,7 +306,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
       return -2;
     }
 
-    HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), getConf());
+    HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf());
 
     Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4e8cbaa..f0a1fa1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -85,7 +85,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   /** Filled when we read in the trailer. */
   private final Compression.Algorithm compressAlgo;
 
-  private boolean isPrimaryReplicaReader;
+  private final boolean primaryReplicaReader;
 
   /**
    * What kind of data block encoding should be used while reading, writing,
@@ -156,6 +156,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   /** Minor versions starting with this number have faked index key */
   static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
 
+  @VisibleForTesting
+  @Deprecated
+  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+      long fileSize, CacheConfig cacheConf, HFileSystem hfs, Configuration conf)
+      throws IOException {
+    this(path, trailer, fsdis, fileSize, cacheConf, hfs, true, conf);
+  }
+
   /**
    * Opens a HFile. You must load the index before you can use it by calling
    * {@link #loadFileInfo()}.
@@ -175,11 +183,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    *          Configuration
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-  public HFileReaderImpl(final Path path, FixedFileTrailer trailer,
-      final FSDataInputStreamWrapper fsdis,
-      final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
-      final Configuration conf)
-  throws IOException {
+  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
+      long fileSize, CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader,
+      Configuration conf) throws IOException {
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
     this.cacheConf = cacheConf;
@@ -187,6 +193,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     this.path = path;
     this.name = path.getName();
     this.conf = conf;
+    this.primaryReplicaReader = primaryReplicaReader;
     checkFileVersion();
     this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
     this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
@@ -453,12 +460,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
   @Override
   public boolean isPrimaryReplicaReader() {
-    return isPrimaryReplicaReader;
-  }
-
-  @Override
-  public void setPrimaryReplicaReader(boolean isPrimaryReplicaReader) {
-    this.isPrimaryReplicaReader = isPrimaryReplicaReader;
+    return primaryReplicaReader;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 19daeed..3af4290 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static java.lang.String.format;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -27,7 +32,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,9 +67,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClientServiceCallable;
@@ -99,10 +100,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
  */
@@ -937,8 +934,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     }
     HFile.Reader hfr = null;
     try {
-      hfr = HFile.createReader(fs, hfilePath,
-          new CacheConfig(getConf()), getConf());
+      hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf());
     } catch (FileNotFoundException fnfe) {
       LOG.debug("encountered", fnfe);
       return new Pair<>(null, hfilePath.getName());
@@ -1105,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     HalfStoreFileReader halfReader = null;
     StoreFileWriter halfWriter = null;
     try {
-      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
+      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+          new AtomicInteger(0), true, conf);
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -1213,30 +1210,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
           throws IOException {
         Path hfile = hfileStatus.getPath();
-        HFile.Reader reader = HFile.createReader(fs, hfile,
-            new CacheConfig(getConf()), getConf());
-        try {
+        try (HFile.Reader reader =
+            HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
             hcd.setCompressionType(reader.getFileContext().getCompression());
-            LOG.info("Setting compression " + hcd.getCompressionType().name() +
-                     " for family " + hcd.toString());
+            LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " +
+                hcd.toString());
           }
           reader.loadFileInfo();
           byte[] first = reader.getFirstRowKey();
-          byte[] last  = reader.getLastRowKey();
+          byte[] last = reader.getLastRowKey();
 
-          LOG.info("Trying to figure out region boundaries hfile=" + hfile +
-            " first=" + Bytes.toStringBinary(first) +
-            " last="  + Bytes.toStringBinary(last));
+          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+              Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
 
           // To eventually infer start key-end key boundaries
-          Integer value = map.containsKey(first)? map.get(first):0;
-          map.put(first, value+1);
+          Integer value = map.containsKey(first) ? map.get(first) : 0;
+          map.put(first, value + 1);
 
-          value = map.containsKey(last)? map.get(last):0;
-          map.put(last, value-1);
-        } finally {
-          reader.close();
+          value = map.containsKey(last) ? map.get(last) : 0;
+          map.put(last, value - 1);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
index 366378a..3600fe0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java
@@ -264,7 +264,7 @@ public class MergeTableRegionsProcedure
 
   @Override
   protected MergeTableRegionsState getState(final int stateId) {
-    return MergeTableRegionsState.valueOf(stateId);
+    return MergeTableRegionsState.forNumber(stateId);
   }
 
   @Override
@@ -613,11 +613,8 @@ public class MergeTableRegionsProcedure
         final CacheConfig cacheConf = new CacheConfig(conf, hcd);
         for (StoreFileInfo storeFileInfo: storeFiles) {
           // Create reference file(s) of the region in mergedDir
-          regionFs.mergeStoreFile(
-            mergedRegionInfo,
-            family,
-            new StoreFile(
-              mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()),
+          regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(),
+              storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true),
             mergedDir);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
index 3cd6c66..bf9afd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
@@ -285,7 +285,7 @@ public class SplitTableRegionProcedure
 
   @Override
   protected SplitTableRegionState getState(final int stateId) {
-    return SplitTableRegionState.valueOf(stateId);
+    return SplitTableRegionState.forNumber(stateId);
   }
 
   @Override
@@ -571,9 +571,9 @@ public class SplitTableRegionProcedure
       if (storeFiles != null && storeFiles.size() > 0) {
         final CacheConfig cacheConf = new CacheConfig(conf, hcd);
         for (StoreFileInfo storeFileInfo: storeFiles) {
-          StoreFileSplitter sfs = new StoreFileSplitter(regionFs, family.getBytes(),
-            new StoreFile(mfs.getFileSystem(), storeFileInfo, conf,
-              cacheConf, hcd.getBloomFilterType()));
+          StoreFileSplitter sfs =
+              new StoreFileSplitter(regionFs, family.getBytes(), new StoreFile(mfs.getFileSystem(),
+                  storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true));
           futures.add(threadPool.submit(sfs));
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
index 7c4d6fe..90d1f2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -44,7 +44,9 @@ public class CachedMobFile extends MobFile implements Comparable<CachedMobFile>
 
   public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
       CacheConfig cacheConf) throws IOException {
-    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+    // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
     return new CachedMobFile(sf);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
index cd4c079..73355e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -118,9 +118,7 @@ public class MobFile {
    * @throws IOException
    */
   public void open() throws IOException {
-    if (sf.getReader() == null) {
-      sf.createReader();
-    }
+    sf.initReader();
   }
 
   /**
@@ -146,7 +144,9 @@ public class MobFile {
    */
   public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
       throws IOException {
-    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+    // XXX: primaryReplica is only used for constructing the key of block cache so it is not a
+    // critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
+    StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
     return new MobFile(sf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index eb75120..06c5001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -333,7 +333,8 @@ public final class MobUtils {
           if (LOG.isDebugEnabled()) {
             LOG.debug(fileName + " is an expired file");
           }
-          filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+          filesToClean
+              .add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
         }
       } catch (Exception e) {
         LOG.error("Cannot parse the fileName " + fileName, e);
@@ -372,7 +373,7 @@ public final class MobUtils {
     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
     Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
     FileSystem fs = mobRootDir.getFileSystem(conf);
-    return mobRootDir.makeQualified(fs);
+    return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
@@ -697,7 +698,7 @@ public final class MobUtils {
       return null;
     }
     Path dstPath = new Path(targetPath, sourceFile.getName());
-    validateMobFile(conf, fs, sourceFile, cacheConfig);
+    validateMobFile(conf, fs, sourceFile, cacheConfig, true);
     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
     LOG.info(msg);
     Path parent = dstPath.getParent();
@@ -718,11 +719,11 @@ public final class MobUtils {
    * @param cacheConfig The current cache config.
    */
   private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
-      CacheConfig cacheConfig) throws IOException {
+      CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
-      storeFile.createReader();
+      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
+      storeFile.initReader();
     } catch (IOException e) {
       LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
       throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 987fe51..05c7076 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -223,12 +223,9 @@ public class PartitionedMobCompactor extends MobCompactor {
         // File in the Del Partition List
 
         // Get delId from the file
-        Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
-        try {
+        try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
           delId.setStartKey(reader.getFirstRowKey());
           delId.setEndKey(reader.getLastRowKey());
-        } finally {
-          reader.close();
         }
         CompactionDelPartition delPartition = delFilesToCompact.get(delId);
         if (delPartition == null) {
@@ -267,12 +264,9 @@ public class PartitionedMobCompactor extends MobCompactor {
           if (withDelFiles) {
             // get startKey and endKey from the file and update partition
             // TODO: is it possible to skip read of most hfiles?
-            Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf);
-            try {
+            try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) {
               compactionPartition.setStartKey(reader.getFirstRowKey());
               compactionPartition.setEndKey(reader.getLastRowKey());
-            } finally {
-              reader.close();
             }
           }
 
@@ -340,10 +334,11 @@ public class PartitionedMobCompactor extends MobCompactor {
     try {
       for (CompactionDelPartition delPartition : request.getDelPartitions()) {
         for (Path newDelPath : delPartition.listDelFiles()) {
-          StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+          StoreFile sf =
+              new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true);
           // pre-create reader of a del file to avoid race condition when opening the reader in each
           // partition.
-          sf.createReader();
+          sf.initReader();
           delPartition.addStoreFile(sf);
           totalDelFileCount++;
         }
@@ -557,7 +552,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       List<StoreFile> filesToCompact = new ArrayList<>();
       for (int i = offset; i < batch + offset; i++) {
         StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
-          BloomType.NONE);
+            BloomType.NONE, true);
         filesToCompact.add(sf);
       }
       filesToCompact.addAll(delFiles);
@@ -739,7 +734,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       }
       for (int i = offset; i < batch + offset; i++) {
         batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
-          BloomType.NONE));
+          BloomType.NONE, true));
       }
       // compact the del files in a batch.
       paths.add(compactDelFilesInBatch(request, batchedDelFiles));
@@ -809,8 +804,8 @@ public class PartitionedMobCompactor extends MobCompactor {
    */
   private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
     throws IOException {
-    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
-      false, HConstants.LATEST_TIMESTAMP);
+    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+      false, true, false, false, HConstants.LATEST_TIMESTAMP);
     Scan scan = new Scan();
     scan.setMaxVersions(column.getMaxVersions());
     long ttl = HStore.determineTTLFromFamily(column);
@@ -893,7 +888,8 @@ public class PartitionedMobCompactor extends MobCompactor {
     for (StoreFile sf : storeFiles) {
       // the readers will be closed later after the merge.
       maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
-      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+      sf.initReader();
+      byte[] count = sf.getReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
       if (count != null) {
         maxKeyCount += Bytes.toLong(count);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index c37ae99..da25df5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -135,7 +135,7 @@ class DefaultStoreFileManager implements StoreFileManager {
     this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
   }
 
-  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
   // Let a background thread close the actual reader on these compacted files and also
   // ensure to evict the blocks from block cache so that they are no longer in
   // cache

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index b021430..032e383 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -292,9 +292,9 @@ public class HMobStore extends HStore {
   private void validateMobFile(Path path) throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile =
-          new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
-      storeFile.createReader();
+      storeFile = new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig,
+          BloomType.NONE, isPrimaryReplicaStore());
+      storeFile.initReader();
     } catch (IOException e) {
       LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
       throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 78ce608..b21a84d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4160,8 +4160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
       Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size());
       for (Path file: files) {
-        fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
-          null, null));
+        fakeStoreFiles.add(
+          new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
       }
       getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 144f43b..014427d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -294,7 +294,7 @@ public class HRegionFileSystem {
    */
   Path getStoreFilePath(final String familyName, final String fileName) {
     Path familyDir = getStoreDir(familyName);
-    return new Path(familyDir, fileName).makeQualified(this.fs);
+    return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
@@ -675,9 +675,7 @@ public class HRegionFileSystem {
     if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
       // Check whether the split row lies in the range of the store file
       // If it is outside the range, return directly.
-      if (f.getReader() == null) {
-        f.createReader();
-      }
+      f.initReader();
       try {
         if (top) {
           //check if larger than last key.

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a98f89e..cbdaa1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -650,13 +650,11 @@ public class HStore implements Store {
     return createStoreFileAndReader(info);
   }
 
-  private StoreFile createStoreFileAndReader(final StoreFileInfo info)
-      throws IOException {
+  private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException {
     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
     StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
-      this.family.getBloomFilterType());
-    StoreFileReader r = storeFile.createReader();
-    r.setReplicaStoreFile(isPrimaryReplicaStore());
+        this.family.getBloomFilterType(), isPrimaryReplicaStore());
+    storeFile.initReader();
     return storeFile;
   }
 
@@ -726,8 +724,8 @@ public class HStore implements Store {
     try {
       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
           + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
-      reader = HFile.createReader(srcPath.getFileSystem(conf),
-          srcPath, cacheConf, conf);
+      reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf,
+        isPrimaryReplicaStore(), conf);
       reader.loadFileInfo();
 
       byte[] firstKey = reader.getFirstRowKey();
@@ -1180,7 +1178,7 @@ public class HStore implements Store {
     // but now we get them in ascending order, which I think is
     // actually more correct, since memstore get put at the end.
     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
-      cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+      cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
     scanners.addAll(sfScanners);
     // Then the memstore scanners
@@ -1203,7 +1201,7 @@ public class HStore implements Store {
       }
     }
     List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
-      cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+      cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
     scanners.addAll(sfScanners);
     // Then the memstore scanners
@@ -2456,8 +2454,9 @@ public class HStore implements Store {
               LOG.debug("The file " + file + " was closed but still not archived.");
             }
             filesToRemove.add(file);
+            continue;
           }
-          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+          if (file.isCompactedAway() && !file.isReferencedInReads()) {
             // Even if deleting fails we need not bother as any new scanners won't be
             // able to use the compacted file as the status is already compactedAway
             if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 41c13f5..d71af2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -54,7 +54,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
 
   /** Constructor for testing. */
   ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners)
+      final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners)
       throws IOException {
     super(scan, scanInfo, scanType, columns, scanners,
         HConstants.LATEST_TIMESTAMP);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 7aef05e..c53fbf08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * and append data. Be sure to add any metadata before calling close on the
  * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
  * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
- * passing filesystem and path.  To read, call {@link #createReader()}.
+ * passing filesystem and path.  To read, call {@link #initReader()}
  * <p>StoreFiles may also reference store files in another Store.
  *
  * The reason for this weird pattern where you use a different instance for the
@@ -64,6 +65,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 public class StoreFile {
   private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
+  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
+
+  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
+
   // Keys for fileinfo values in HFile
 
   /** Max Sequence ID in FileInfo */
@@ -103,6 +108,18 @@ public class StoreFile {
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
+  // Counter that is incremented every time a scanner is created on the
+  // store file. It is decremented when the scan on the store file is
+  // done.
+  private final AtomicInteger refCount = new AtomicInteger(0);
+
+  private final boolean noReadahead;
+
+  private final boolean primaryReplica;
+
+  // Indicates if the file got compacted
+  private volatile boolean compactedAway = false;
+
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
@@ -116,7 +133,7 @@ public class StoreFile {
 
   private Cell lastKey;
 
-  private Comparator comparator;
+  private Comparator<Cell> comparator;
 
   CacheConfig getCacheConf() {
     return cacheConf;
@@ -130,7 +147,7 @@ public class StoreFile {
     return lastKey;
   }
 
-  public Comparator getComparator() {
+  public Comparator<Cell> getComparator() {
     return comparator;
   }
 
@@ -179,72 +196,96 @@ public class StoreFile {
   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a
-   * substantial amount of ram depending on the underlying files (10-20MB?).
-   *
-   * @param fs  The current file system to use.
-   * @param p  The path of the file.
-   * @param conf  The current configuration.
-   * @param cacheConf  The cache configuration and block cache reference.
-   * @param cfBloomType The bloom type to use for this store file as specified
-   *          by column family configuration. This may or may not be the same
-   *          as the Bloom filter type actually present in the HFile, because
-   *          column family configuration might change. If this is
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param p The path of the file.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
-   * @throws IOException When opening the reader fails.
+   * @deprecated Now we will specific whether the StoreFile is for primary replica when
+   *             constructing, so please use
+   *             {@link #StoreFile(FileSystem, Path, Configuration, CacheConfig, BloomType, boolean)}
+   *             directly.
    */
+  @Deprecated
   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
-        final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+      final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
   }
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a
-   * substantial amount of ram depending on the underlying files (10-20MB?).
-   *
-   * @param fs  The current file system to use.
-   * @param fileInfo  The store file information.
-   * @param conf  The current configuration.
-   * @param cacheConf  The cache configuration and block cache reference.
-   * @param cfBloomType The bloom type to use for this store file as specified
-   *          by column family configuration. This may or may not be the same
-   *          as the Bloom filter type actually present in the HFile, because
-   *          column family configuration might change. If this is
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param p The path of the file.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
+   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+   * @throws IOException
+   */
+  public StoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
+      BloomType cfBloomType, boolean primaryReplica) throws IOException {
+    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
+  }
+
+  /**
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs The current file system to use.
+   * @param fileInfo The store file information.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType The bloom type to use for this store file as specified by column family
+   *          configuration. This may or may not be the same as the Bloom filter type actually
+   *          present in the HFile, because column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
-   * @throws IOException When opening the reader fails.
+   * @deprecated Now we will specific whether the StoreFile is for primary replica when
+   *             constructing, so please use
+   *             {@link #StoreFile(FileSystem, StoreFileInfo, Configuration, CacheConfig, BloomType, boolean)}
+   *             directly.
    */
+  @Deprecated
   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
-      final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
+      final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
+    this(fs, fileInfo, conf, cacheConf, cfBloomType, true);
+  }
+
+  /**
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
+   * depending on the underlying files (10-20MB?).
+   * @param fs fs The current file system to use.
+   * @param fileInfo The store file information.
+   * @param conf The current configuration.
+   * @param cacheConf The cache configuration and block cache reference.
+   * @param cfBloomType cfBloomType The bloom type to use for this store file as specified by column
+   *          family configuration. This may or may not be the same as the Bloom filter type
+   *          actually present in the HFile, because column family configuration might change. If
+   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+   */
+  public StoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
+      BloomType cfBloomType, boolean primaryReplica) {
     this.fs = fs;
     this.fileInfo = fileInfo;
     this.cacheConf = cacheConf;
-
+    this.noReadahead =
+        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
       this.cfBloomType = cfBloomType;
     } else {
-      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
-          "cfBloomType=" + cfBloomType + " (disabled in config)");
+      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
+          cfBloomType + " (disabled in config)");
       this.cfBloomType = BloomType.NONE;
     }
-  }
-
-  /**
-   * Clone
-   * @param other The StoreFile to clone from
-   */
-  public StoreFile(final StoreFile other) {
-    this.fs = other.fs;
-    this.fileInfo = other.fileInfo;
-    this.cacheConf = other.cacheConf;
-    this.cfBloomType = other.cfBloomType;
-    this.metadataMap = other.metadataMap;
-  }
-
-  /**
-   * Clone a StoreFile for opening private reader.
-   */
-  public StoreFile cloneForReader() {
-    return new StoreFile(this);
+    this.primaryReplica = primaryReplica;
   }
 
   /**
@@ -266,12 +307,12 @@ public class StoreFile {
    * @return Returns the qualified path of this StoreFile
    */
   public Path getQualifiedPath() {
-    return this.fileInfo.getPath().makeQualified(fs);
+    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
   /**
    * @return True if this is a StoreFile Reference; call
-   * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
+   * after {@link #open()} else may get wrong answer.
    */
   public boolean isReference() {
     return this.fileInfo.isReference();
@@ -376,15 +417,21 @@ public class StoreFile {
 
   @VisibleForTesting
   public boolean isCompactedAway() {
-    if (this.reader != null) {
-      return this.reader.isCompactedAway();
-    }
-    return true;
+    return compactedAway;
   }
 
   @VisibleForTesting
   public int getRefCount() {
-    return this.reader.getRefCount().get();
+    return refCount.get();
+  }
+
+  /**
+   * @return true if the file is still used in reads
+   */
+  public boolean isReferencedInReads() {
+    int rc = refCount.get();
+    assert rc >= 0; // we should not go negative.
+    return rc > 0;
   }
 
   /**
@@ -404,18 +451,18 @@ public class StoreFile {
   }
 
   /**
-   * Opens reader on this store file.  Called by Constructor.
-   * @return Reader for the store file.
+   * Opens reader on this store file. Called by Constructor.
    * @throws IOException
    * @see #closeReader(boolean)
    */
-  private StoreFileReader open(boolean canUseDropBehind) throws IOException {
+  private void open() throws IOException {
     if (this.reader != null) {
       throw new IllegalAccessError("Already open");
     }
 
     // Open the StoreFile.Reader
-    this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
+    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
+      primaryReplica, refCount, true);
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -513,38 +560,45 @@ public class StoreFile {
     firstKey = reader.getFirstKey();
     lastKey = reader.getLastKey();
     comparator = reader.getComparator();
-    return this.reader;
-  }
-
-  public StoreFileReader createReader() throws IOException {
-    return createReader(false);
   }
 
   /**
-   * @return Reader for StoreFile. creates if necessary
-   * @throws IOException
+   * Initialize the reader used for pread.
    */
-  public StoreFileReader createReader(boolean canUseDropBehind) throws IOException {
-    if (this.reader == null) {
+  public void initReader() throws IOException {
+    if (reader == null) {
       try {
-        this.reader = open(canUseDropBehind);
-      } catch (IOException e) {
+        open();
+      } catch (Exception e) {
         try {
-          boolean evictOnClose =
-              cacheConf != null? cacheConf.shouldEvictOnClose(): true;
+          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
           this.closeReader(evictOnClose);
         } catch (IOException ee) {
+          LOG.warn("failed to close reader", ee);
         }
         throw e;
       }
-
     }
-    return this.reader;
+  }
+
+  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
+    initReader();
+    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
+      primaryReplica, refCount, false);
+    reader.copyFields(this.reader);
+    return reader;
+  }
+
+  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
+      boolean pread, boolean isCompaction, long readPt, long scannerOrder,
+      boolean canOptimizeForNonNullColumn) throws IOException {
+    return createStreamReader(canUseDropBehind).getStoreFileScanner(
+      cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**
-   * @return Current reader.  Must call createReader first else returns null.
-   * @see #createReader()
+   * @return Current reader.  Must call initReader first else returns null.
+   * @see #initReader()
    */
   public StoreFileReader getReader() {
     return this.reader;
@@ -566,9 +620,7 @@ public class StoreFile {
    * Marks the status of the file as compactedAway.
    */
   public void markCompactedAway() {
-    if (this.reader != null) {
-      this.reader.markCompactedAway();
-    }
+    this.compactedAway = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 3c12045..c4754a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@@ -233,25 +234,24 @@ public class StoreFileInfo {
    * @param cacheConf The cache configuration and block cache reference.
    * @return The StoreFile.Reader for the file
    */
-  public StoreFileReader open(final FileSystem fs,
-      final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
+  public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind,
+      long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared)
+      throws IOException {
     FSDataInputStreamWrapper in;
     FileStatus status;
 
     final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
     if (this.link != null) {
       // HFileLink
-      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead);
       status = this.link.getFileStatus(fs);
     } else if (this.reference != null) {
       // HFile Reference
       Path referencePath = getReferredToFile(this.getPath());
-      in = new FSDataInputStreamWrapper(fs, referencePath,
-          doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead);
       status = fs.getFileStatus(referencePath);
     } else {
-      in = new FSDataInputStreamWrapper(fs, this.getPath(),
-          doDropBehind);
+      in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead);
       status = fs.getFileStatus(initialPath);
     }
     long length = status.getLen();
@@ -265,9 +265,10 @@ public class StoreFileInfo {
     if (reader == null) {
       if (this.reference != null) {
         reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
-          conf);
+            isPrimaryReplicaStoreFile, refCount, shared, conf);
       } else {
-        reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf);
+        reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf,
+            isPrimaryReplicaStoreFile, refCount, shared, conf);
       }
     }
     if (this.coprocessorHost != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 8f01a93..b015ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.Map;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -68,36 +69,47 @@ public class StoreFileReader {
   private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
   private boolean skipResetSeqId = true;
 
-  public AtomicInteger getRefCount() {
-    return refCount;
-  }
-
   // Counter that is incremented every time a scanner is created on the
-  // store file.  It is decremented when the scan on the store file is
-  // done.
-  private AtomicInteger refCount = new AtomicInteger(0);
-  // Indicates if the file got compacted
-  private volatile boolean compactedAway = false;
+  // store file. It is decremented when the scan on the store file is
+  // done. All StoreFileReader for the same StoreFile will share this counter.
+  private final AtomicInteger refCount;
 
-  public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
-      throws IOException {
-    reader = HFile.createReader(fs, path, cacheConf, conf);
+  // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
+  // close the internal reader when readCompleted is called.
+  private final boolean shared;
+
+  private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
+    this.reader = reader;
     bloomFilterType = BloomType.NONE;
+    this.refCount = refCount;
+    this.shared = shared;
   }
 
-  void markCompactedAway() {
-    this.compactedAway = true;
+  public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf,
+      boolean primaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
+      throws IOException {
+    this(HFile.createReader(fs, path, cacheConf, primaryReplicaStoreFile, conf), refCount, shared);
   }
 
   public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
-      CacheConfig cacheConf, Configuration conf) throws IOException {
-    reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
-    bloomFilterType = BloomType.NONE;
+      CacheConfig cacheConf, boolean primaryReplicaStoreFile, AtomicInteger refCount,
+      boolean shared, Configuration conf) throws IOException {
+    this(HFile.createReader(fs, path, in, size, cacheConf, primaryReplicaStoreFile, conf), refCount,
+        shared);
   }
 
-  public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
-    reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
+  void copyFields(StoreFileReader reader) {
+    this.generalBloomFilter = reader.generalBloomFilter;
+    this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter;
+    this.bloomFilterType = reader.bloomFilterType;
+    this.sequenceID = reader.sequenceID;
+    this.timeRange = reader.timeRange;
+    this.lastBloomKey = reader.lastBloomKey;
+    this.bulkLoadResult = reader.bulkLoadResult;
+    this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV;
+    this.skipResetSeqId = reader.skipResetSeqId;
   }
+
   public boolean isPrimaryReplicaReader() {
     return reader.isPrimaryReplicaReader();
   }
@@ -105,8 +117,11 @@ public class StoreFileReader {
   /**
    * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
    */
+  @VisibleForTesting
   StoreFileReader() {
+    this.refCount = new AtomicInteger(0);
     this.reader = null;
+    this.shared = false;
   }
 
   public CellComparator getComparator() {
@@ -128,30 +143,23 @@ public class StoreFileReader {
       boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
     // Increment the ref count
     refCount.incrementAndGet();
-    return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction,
-        reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
+    return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction),
+        !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**
-   * Decrement the ref count associated with the reader when ever a scanner associated
-   * with the reader is closed
+   * Indicate that the scanner has finished reading with this reader. We need to decrement the ref
+   * count, and also, if this is not the common pread reader, we should close it.
    */
-  void decrementRefCount() {
+  void readCompleted() {
     refCount.decrementAndGet();
-  }
-
-  /**
-   * @return true if the file is still used in reads
-   */
-  public boolean isReferencedInReads() {
-    return refCount.get() != 0;
-  }
-
-  /**
-   * @return true if the file is compacted
-   */
-  public boolean isCompactedAway() {
-    return this.compactedAway;
+    if (!shared) {
+      try {
+        reader.close(false);
+      } catch (IOException e) {
+        LOG.warn("failed to close stream reader", e);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index ab6b0ef..aa4f897 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -124,26 +122,44 @@ public class StoreFileScanner implements KeyValueScanner {
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
       boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
-      ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
+      ScanQueryMatcher matcher, long readPt) throws IOException {
     List<StoreFileScanner> scanners = new ArrayList<>(files.size());
-    List<StoreFile> sorted_files = new ArrayList<>(files);
-    Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
-    for (int i = 0; i < sorted_files.size(); i++) {
-      StoreFileReader r = sorted_files.get(i).createReader(canUseDrop);
-      r.setReplicaStoreFile(isPrimaryReplica);
-      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt,
-        i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
+    List<StoreFile> sortedFiles = new ArrayList<>(files);
+    Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+    for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+      StoreFile sf = sortedFiles.get(i);
+      sf.initReader();
+      StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
+        isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
       scanners.add(scanner);
     }
     return scanners;
   }
 
-  public static List<StoreFileScanner> getScannersForStoreFiles(
-    Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-    boolean isCompaction, boolean canUseDrop,
-    ScanQueryMatcher matcher, long readPt) throws IOException {
-    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
-      matcher, readPt, true);
+  /**
+   * Get scanners for compaction. We will create a separated reader for each store file to avoid
+   * contention with normal read request.
+   */
+  public static List<StoreFileScanner> getScannersForCompaction(Collection<StoreFile> files,
+      boolean canUseDropBehind, long readPt) throws IOException {
+    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
+    List<StoreFile> sortedFiles = new ArrayList<>(files);
+    Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+    boolean succ = false;
+    try {
+      for (int i = 0, n = sortedFiles.size(); i < n; i++) {
+        scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
+          readPt, i, false));
+      }
+      succ = true;
+    } finally {
+      if (!succ) {
+        for (StoreFileScanner scanner : scanners) {
+          scanner.close();
+        }
+      }
+    }
+    return scanners;
   }
 
   public String toString() {
@@ -262,7 +278,7 @@ public class StoreFileScanner implements KeyValueScanner {
     cur = null;
     this.hfs.close();
     if (this.reader != null) {
-      this.reader.decrementRefCount();
+      this.reader.readCompleted();
     }
     closed = true;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 99ec30e..3bc6a0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -312,7 +312,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   StoreScanner(final Scan scan, ScanInfo scanInfo,
       ScanType scanType, final NavigableSet<byte[]> columns,
-      final List<KeyValueScanner> scanners) throws IOException {
+      final List<? extends KeyValueScanner> scanners) throws IOException {
     this(scan, scanInfo, scanType, columns, scanners,
         HConstants.LATEST_TIMESTAMP,
         // 0 is passed as readpoint because the test bypasses Store
@@ -322,7 +322,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @VisibleForTesting
   StoreScanner(final Scan scan, ScanInfo scanInfo,
     ScanType scanType, final NavigableSet<byte[]> columns,
-    final List<KeyValueScanner> scanners, long earliestPutTs)
+    final List<? extends KeyValueScanner> scanners, long earliestPutTs)
         throws IOException {
     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
       // 0 is passed as readpoint because the test bypasses Store
@@ -330,7 +330,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs,
+      final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs,
       long readPt) throws IOException {
     this(null, scan, scanInfo, columns, readPt,
         scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);