You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/11/08 06:35:00 UTC

[hbase] branch branch-2 updated: HBASE-22888 Share some stuffs with the initial reader when new stream reader created (#581)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 235d03b  HBASE-22888 Share some stuffs with the initial reader when new stream reader created (#581)
235d03b is described below

commit 235d03bc075a0b96c8ce89e7eb815595f92ec3a9
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Fri Nov 8 13:54:53 2019 +0800

    HBASE-22888 Share some stuffs with the initial reader when new stream reader created (#581)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/mapreduce/HFileInputFormat.java   |   1 -
 .../mapreduce/TestCellBasedHFileOutputFormat2.java |   4 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java    |   9 +-
 .../TestImportTSVWithVisibilityLabels.java         |   1 -
 .../hadoop/hbase/mapreduce/TestImportTsv.java      |   1 -
 .../hadoop/hbase/io/HalfStoreFileReader.java       |  54 +--
 .../org/apache/hadoop/hbase/io/hfile/HFile.java    | 376 ++-------------
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  24 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java     |  44 +-
 .../hbase/io/hfile/HFileDataBlockEncoderImpl.java  |   5 +-
 .../apache/hadoop/hbase/io/hfile/HFileInfo.java    | 503 +++++++++++++++++++++
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    | 109 +++++
 .../hadoop/hbase/io/hfile/HFilePrettyPrinter.java  |  38 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     | 411 ++++-------------
 .../hadoop/hbase/io/hfile/HFileStreamReader.java   |  39 ++
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  28 +-
 .../hadoop/hbase/io/hfile/ReaderContext.java       |  76 ++++
 .../hbase/io/hfile/ReaderContextBuilder.java       | 106 +++++
 .../assignment/MergeTableRegionsProcedure.java     |  10 +-
 .../assignment/SplitTableRegionProcedure.java      |   6 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   2 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   8 +-
 .../hadoop/hbase/regionserver/HStoreFile.java      | 141 +++---
 .../hadoop/hbase/regionserver/StoreFileInfo.java   | 177 +++++---
 .../hadoop/hbase/regionserver/StoreFileReader.java |  62 ++-
 .../hbase/regionserver/compactions/Compactor.java  |   4 +-
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |  14 +-
 .../apache/hadoop/hbase/util/CompressionTest.java  |   1 -
 .../org/apache/hadoop/hbase/util/HBaseFsck.java    |   1 -
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java |   2 +-
 .../hadoop/hbase/HFilePerformanceEvaluation.java   |   1 -
 .../hadoop/hbase/io/TestHalfStoreFileReader.java   |  19 +-
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |  46 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    |  35 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      |  50 +-
 .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java |   9 +-
 .../hadoop/hbase/io/hfile/TestHFileEncryption.java |  10 +-
 .../hadoop/hbase/io/hfile/TestHFileReaderImpl.java |   1 -
 .../hfile/TestHFileScannerImplReferenceCount.java  |  32 +-
 .../hadoop/hbase/io/hfile/TestHFileSeek.java       |   7 +-
 .../hadoop/hbase/io/hfile/TestHFileWriterV3.java   |  28 +-
 .../io/hfile/TestLazyDataBlockDecompression.java   |  13 +-
 .../apache/hadoop/hbase/io/hfile/TestReseekTo.java |   1 -
 .../apache/hadoop/hbase/io/hfile/TestSeekTo.java   |   4 -
 .../regionserver/TestEncryptionKeyRotation.java    |   1 -
 .../regionserver/TestEncryptionRandomKeying.java   |   1 -
 .../hadoop/hbase/regionserver/TestHStore.java      |   2 +-
 .../hadoop/hbase/regionserver/TestHStoreFile.java  |  34 +-
 .../regionserver/TestRowPrefixBloomFilter.java     |  18 +-
 .../hbase/regionserver/TestStoreFileInfo.java      |  21 +-
 .../TestStoreFileScannerWithTagCompression.java    |   8 +-
 .../hbase/regionserver/TestSwitchToStreamRead.java |  13 +-
 .../hbase/tool/TestLoadIncrementalHFiles.java      |   3 +-
 .../hadoop/hbase/util/TestHBaseFsckEncryption.java |   1 -
 54 files changed, 1527 insertions(+), 1088 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
index 11e6c08..1a9b655 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -91,7 +91,6 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
 
       // The file info must be loaded before the scanner can be used.
       // This seems like a bug in HBase, but it's easily worked around.
-      this.in.loadFileInfo();
       this.scanner = in.getScanner(false, false);
 
     }
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedHFileOutputFormat2.java
index 9367b81..dd243f9 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedHFileOutputFormat2.java
@@ -394,7 +394,7 @@ public class TestCellBasedHFileOutputFormat2  {
       // open as HFile Reader and pull out TIMERANGE FileInfo.
       HFile.Reader rd =
           HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
-      Map<byte[],byte[]> finfo = rd.loadFileInfo();
+      Map<byte[],byte[]> finfo = rd.getHFileInfo();
       byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
       assertNotNull(range);
 
@@ -1159,7 +1159,7 @@ public class TestCellBasedHFileOutputFormat2  {
         // compression
         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
-        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
 
         byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index d4c3802..30b0022 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -395,8 +395,8 @@ public class TestHFileOutputFormat2  {
       // open as HFile Reader and pull out TIMERANGE FileInfo.
       HFile.Reader rd =
           HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
-      Map<byte[],byte[]> finfo = rd.loadFileInfo();
-      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
+      Map<byte[],byte[]> finfo = rd.getHFileInfo();
+      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
       assertNotNull(range);
 
       // unmarshall and check values.
@@ -1178,7 +1178,7 @@ public class TestHFileOutputFormat2  {
         // compression
         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
-        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
 
         byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
@@ -1542,7 +1542,8 @@ public class TestHFileOutputFormat2  {
         LocatedFileStatus keyFileStatus = iterator.next();
         HFile.Reader reader =
             HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
-        assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression);
+        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
+            hfileoutputformatCompression);
       }
     } finally {
       if (writer != null && context != null) {
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
index 7f4ad9e..9ee649b 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
@@ -484,7 +484,6 @@ public class TestImportTSVWithVisibilityLabels implements Configurable {
   private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
     Configuration conf = util.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();
     int count = 0;
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index 74fdc99..fb213a3 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -561,7 +561,6 @@ public class TestImportTsv implements Configurable {
   private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
     Configuration conf = util.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();
     int count = 0;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index 11ab068..ab293e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -23,21 +23,21 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A facade for a {@link org.apache.hadoop.hbase.io.hfile.HFile.Reader} that serves up
@@ -67,44 +67,18 @@ public class HalfStoreFileReader extends StoreFileReader {
   private boolean firstKeySeeked = false;
 
   /**
-   * Creates a half file reader for a normal hfile.
-   * @param fs fileystem to read from
-   * @param p path to hfile
-   * @param cacheConf
-   * @param r original reference file (contains top or bottom)
-   * @param conf Configuration
-   * @throws IOException
-   */
-  public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r,
-      boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
-      throws IOException {
-    super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
-    // This is not actual midkey for this half-file; its just border
-    // around which we split top and bottom.  Have to look in files to find
-    // actual last and first keys for bottom and top halves.  Half-files don't
-    // have an actual midkey themselves. No midkey is how we indicate file is
-    // not splittable.
-    this.splitkey = r.getSplitKey();
-    this.splitCell = new KeyValue.KeyOnlyKeyValue(this.splitkey, 0, this.splitkey.length);
-    // Is it top or bottom half?
-    this.top = Reference.isTopFileRegion(r.getFileRegion());
-  }
-
-  /**
    * Creates a half file reader for a hfile referred to by an hfilelink.
-   * @param fs fileystem to read from
-   * @param p path to hfile
-   * @param in {@link FSDataInputStreamWrapper}
-   * @param size Full size of the hfile file
-   * @param cacheConf
+   * @param context Reader context info
+   * @param fileInfo HFile info
+   * @param cacheConf CacheConfig
    * @param r original reference file (contains top or bottom)
+   * @param refCount reference count
    * @param conf Configuration
-   * @throws IOException
    */
-  public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in,
-      long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile,
-      AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException {
-    super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf);
+  public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo,
+      final CacheConfig cacheConf, final Reference r,
+      AtomicInteger refCount, final Configuration conf) throws IOException {
+    super(context, fileInfo, cacheConf, refCount, conf);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 33e815e..3719611 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -18,28 +18,17 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.SequenceInputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,27 +37,21 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.MetricsIO;
 import org.apache.hadoop.hbase.io.MetricsIOWrapperImpl;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.BytesBytesPair;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HFileProtos;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -424,8 +407,6 @@ public class HFile {
 
     HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
 
-    Map<byte[], byte[]> loadFileInfo() throws IOException;
-
     Optional<Cell> getLastKey();
 
     Optional<Cell> midKey() throws IOException;
@@ -444,11 +425,13 @@ public class HFile {
 
     FixedFileTrailer getTrailer();
 
-    HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
+    void setDataBlockIndexReader(HFileBlockIndex.CellBasedKeyBlockIndexReader reader);
+    HFileBlockIndex.CellBasedKeyBlockIndexReader getDataBlockIndexReader();
 
-    HFileScanner getScanner(boolean cacheBlocks, boolean pread);
+    void setMetaBlockIndexReader(HFileBlockIndex.ByteArrayKeyBlockIndexReader reader);
+    HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader();
 
-    Compression.Algorithm getCompressionAlgorithm();
+    HFileScanner getScanner(boolean cacheBlocks, boolean pread);
 
     /**
      * Retrieves general Bloom filter metadata as appropriate for each
@@ -480,10 +463,6 @@ public class HFile {
 
     boolean isPrimaryReplicaReader();
 
-    boolean shouldIncludeMemStoreTS();
-
-    boolean isDecodeMemStoreTS();
-
     DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
 
     @VisibleForTesting
@@ -497,88 +476,60 @@ public class HFile {
      * implementation should take care of thread safety.
      */
     void unbufferStream();
+
+    ReaderContext getContext();
+    HFileInfo getHFileInfo();
+    void setDataBlockEncoder(HFileDataBlockEncoder dataBlockEncoder);
   }
 
   /**
    * Method returns the reader given the specified arguments.
    * TODO This is a bad abstraction.  See HBASE-6635.
    *
-   * @param path hfile's path
-   * @param fsdis stream of path's file
-   * @param size max size of the trailer.
+   * @param context Reader context info
+   * @param fileInfo HFile info
    * @param cacheConf Cache configuation values, cannot be null.
-   * @param hfs
-   * @param primaryReplicaReader true if this is a reader for primary replica
+   * @param conf Configuration
    * @return an appropriate instance of HFileReader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
-      CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
-      throws IOException {
-    FixedFileTrailer trailer = null;
+  public static Reader createReader(ReaderContext context, HFileInfo fileInfo,
+      CacheConfig cacheConf, Configuration conf) throws IOException {
     try {
-      boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
-      assert !isHBaseChecksum; // Initially we must read with FS checksum.
-      trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
+      if (context.getReaderType() == ReaderType.STREAM) {
+        // stream reader will share trailer with pread reader, see HFileStreamReader#copyFields
+        return new HFileStreamReader(context, fileInfo, cacheConf, conf);
+      }
+      FixedFileTrailer trailer = fileInfo.getTrailer();
       switch (trailer.getMajorVersion()) {
         case 2:
           LOG.debug("Opening HFile v2 with v3 reader");
           // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
         case 3:
-          return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs,
-              primaryReplicaReader, conf);
+          return new HFilePreadReader(context, fileInfo, cacheConf, conf);
         default:
           throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
       }
     } catch (Throwable t) {
-      IOUtils.closeQuietly(fsdis);
-      throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
+      IOUtils.closeQuietly(context.getInputStreamWrapper());
+      throw new CorruptHFileException("Problem reading HFile Trailer from file "
+          + context.getFilePath(), t);
     } finally {
-      fsdis.unbuffer();
+      context.getInputStreamWrapper().unbuffer();
     }
   }
 
   /**
-   * The sockets and the file descriptors held by the method parameter
-   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
-   * that no other threads have access to the same passed reference.
-   * @param fs A file system
-   * @param path Path to HFile
-   * @param fsdis a stream of path's file
-   * @param size max size of the trailer.
-   * @param cacheConf Cache configuration for hfile's contents
-   * @param primaryReplicaReader true if this is a reader for primary replica
+   * Creates reader with cache configuration disabled
+   * @param fs filesystem
+   * @param path Path to file to read
    * @param conf Configuration
-   * @return A version specific Hfile Reader
-   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
+   * @return an active Reader instance
+   * @throws IOException Will throw a CorruptHFileException
+   *   (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
    */
-  public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis,
-      long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf)
-      throws IOException {
-    HFileSystem hfs = null;
-
-    // If the fs is not an instance of HFileSystem, then create an
-    // instance of HFileSystem that wraps over the specified fs.
-    // In this case, we will not be able to avoid checksumming inside
-    // the filesystem.
-    if (!(fs instanceof HFileSystem)) {
-      hfs = new HFileSystem(fs);
-    } else {
-      hfs = (HFileSystem) fs;
-    }
-    return openReader(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
-  }
-
-  /**
-  * Creates reader with cache configuration disabled
-  * @param fs filesystem
-  * @param path Path to file to read
-  * @return an active Reader instance
-  * @throws IOException Will throw a CorruptHFileException
-  * (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
-  */
   public static Reader createReader(FileSystem fs, Path path, Configuration conf)
       throws IOException {
     // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use
@@ -592,6 +543,7 @@ public class HFile {
    * @param cacheConf This must not be null. @see
    *          {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
    * @param primaryReplicaReader true if this is a reader for primary replica
+   * @param conf Configuration
    * @return an active Reader instance
    * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
    *           is corrupt/invalid.
@@ -600,21 +552,18 @@ public class HFile {
       boolean primaryReplicaReader, Configuration conf) throws IOException {
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
-      stream.getHfs(), primaryReplicaReader, conf);
-  }
-
-  /**
-   * This factory method is used only by unit tests. <br/>
-   * The sockets and the file descriptors held by the method parameter
-   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
-   * that no other threads have access to the same passed reference.
-   */
-  @VisibleForTesting
-  static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
-      CacheConfig cacheConf, Configuration conf) throws IOException {
-    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return openReader(path, wrapper, size, cacheConf, null, true, conf);
+    ReaderContext context = new ReaderContextBuilder()
+        .withFilePath(path)
+        .withInputStreamWrapper(stream)
+        .withFileSize(fs.getFileStatus(path).getLen())
+        .withFileSystem(stream.getHfs())
+        .withPrimaryReplicaReader(primaryReplicaReader)
+        .withReaderType(ReaderType.PREAD)
+        .build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
+    Reader reader = createReader(context, fileInfo, cacheConf, conf);
+    fileInfo.initMetaAndIndex(reader);
+    return reader;
   }
 
   /**
@@ -650,237 +599,6 @@ public class HFile {
   }
 
   /**
-   * Metadata for this file. Conjured by the writer. Read in by the reader.
-   */
-  public static class FileInfo implements SortedMap<byte[], byte[]> {
-    static final String RESERVED_PREFIX = "hfile.";
-    static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
-    static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
-    static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
-    static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
-    static final byte [] CREATE_TIME_TS = Bytes.toBytes(RESERVED_PREFIX + "CREATE_TIME_TS");
-    static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
-    static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
-    public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
-    private final SortedMap<byte [], byte []> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    public FileInfo() {
-      super();
-    }
-
-    /**
-     * Append the given key/value pair to the file info, optionally checking the
-     * key prefix.
-     *
-     * @param k key to add
-     * @param v value to add
-     * @param checkPrefix whether to check that the provided key does not start
-     *          with the reserved prefix
-     * @return this file info object
-     * @throws IOException if the key or value is invalid
-     */
-    public FileInfo append(final byte[] k, final byte[] v,
-        final boolean checkPrefix) throws IOException {
-      if (k == null || v == null) {
-        throw new NullPointerException("Key nor value may be null");
-      }
-      if (checkPrefix && isReservedFileInfoKey(k)) {
-        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
-            + " are reserved");
-      }
-      put(k, v);
-      return this;
-    }
-
-    @Override
-    public void clear() {
-      this.map.clear();
-    }
-
-    @Override
-    public Comparator<? super byte[]> comparator() {
-      return map.comparator();
-    }
-
-    @Override
-    public boolean containsKey(Object key) {
-      return map.containsKey(key);
-    }
-
-    @Override
-    public boolean containsValue(Object value) {
-      return map.containsValue(value);
-    }
-
-    @Override
-    public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
-      return map.entrySet();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      return map.equals(o);
-    }
-
-    @Override
-    public byte[] firstKey() {
-      return map.firstKey();
-    }
-
-    @Override
-    public byte[] get(Object key) {
-      return map.get(key);
-    }
-
-    @Override
-    public int hashCode() {
-      return map.hashCode();
-    }
-
-    @Override
-    public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
-      return this.map.headMap(toKey);
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return map.isEmpty();
-    }
-
-    @Override
-    public Set<byte[]> keySet() {
-      return map.keySet();
-    }
-
-    @Override
-    public byte[] lastKey() {
-      return map.lastKey();
-    }
-
-    @Override
-    public byte[] put(byte[] key, byte[] value) {
-      return this.map.put(key, value);
-    }
-
-    @Override
-    public void putAll(Map<? extends byte[], ? extends byte[]> m) {
-      this.map.putAll(m);
-    }
-
-    @Override
-    public byte[] remove(Object key) {
-      return this.map.remove(key);
-    }
-
-    @Override
-    public int size() {
-      return map.size();
-    }
-
-    @Override
-    public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
-      return this.map.subMap(fromKey, toKey);
-    }
-
-    @Override
-    public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
-      return this.map.tailMap(fromKey);
-    }
-
-    @Override
-    public Collection<byte[]> values() {
-      return map.values();
-    }
-
-    /**
-     * Write out this instance on the passed in <code>out</code> stream.
-     * We write it as a protobuf.
-     * @param out
-     * @throws IOException
-     * @see #read(DataInputStream)
-     */
-    void write(final DataOutputStream out) throws IOException {
-      HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
-      for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
-        HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
-        bbpBuilder.setFirst(UnsafeByteOperations.unsafeWrap(e.getKey()));
-        bbpBuilder.setSecond(UnsafeByteOperations.unsafeWrap(e.getValue()));
-        builder.addMapEntry(bbpBuilder.build());
-      }
-      out.write(ProtobufMagic.PB_MAGIC);
-      builder.build().writeDelimitedTo(out);
-    }
-
-    /**
-     * Populate this instance with what we find on the passed in <code>in</code> stream.
-     * Can deserialize protobuf of old Writables format.
-     * @param in
-     * @throws IOException
-     * @see #write(DataOutputStream)
-     */
-    void read(final DataInputStream in) throws IOException {
-      // This code is tested over in TestHFileReaderV1 where we read an old hfile w/ this new code.
-      int pblen = ProtobufUtil.lengthOfPBMagic();
-      byte [] pbuf = new byte[pblen];
-      if (in.markSupported()) in.mark(pblen);
-      int read = in.read(pbuf);
-      if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
-      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
-        parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
-      } else {
-        if (in.markSupported()) {
-          in.reset();
-          parseWritable(in);
-        } else {
-          // We cannot use BufferedInputStream, it consumes more than we read from the underlying IS
-          ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
-          SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams
-          // TODO: Am I leaking anything here wrapping the passed in stream?  We are not calling close on the wrapped
-          // streams but they should be let go after we leave this context?  I see that we keep a reference to the
-          // passed in inputstream but since we no longer have a reference to this after we leave, we should be ok.
-          parseWritable(new DataInputStream(sis));
-        }
-      }
-    }
-
-    /** Now parse the old Writable format.  It was a list of Map entries.  Each map entry was a key and a value of
-     * a byte [].  The old map format had a byte before each entry that held a code which was short for the key or
-     * value type.  We know it was a byte [] so in below we just read and dump it.
-     * @throws IOException
-     */
-    void parseWritable(final DataInputStream in) throws IOException {
-      // First clear the map.  Otherwise we will just accumulate entries every time this method is called.
-      this.map.clear();
-      // Read the number of entries in the map
-      int entries = in.readInt();
-      // Then read each key/value pair
-      for (int i = 0; i < entries; i++) {
-        byte [] key = Bytes.readByteArray(in);
-        // We used to read a byte that encoded the class type.  Read and ignore it because it is always byte [] in hfile
-        in.readByte();
-        byte [] value = Bytes.readByteArray(in);
-        this.map.put(key, value);
-      }
-    }
-
-    /**
-     * Fill our map with content of the pb we read off disk
-     * @param fip protobuf message to read
-     */
-    void parsePB(final HFileProtos.FileInfoProto fip) {
-      this.map.clear();
-      for (BytesBytesPair pair: fip.getMapEntryList()) {
-        this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
-      }
-    }
-  }
-
-  /** Return true if the given file info key is reserved for internal use. */
-  public static boolean isReservedFileInfoKey(byte[] key) {
-    return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
-  }
-
-  /**
    * Get names of supported compression algorithms. The names are acceptable by
    * HFile.Writer.
    *
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index e543e54..0920068 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -32,7 +32,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@@ -1451,33 +1450,24 @@ public class HFileBlock implements Cacheable {
 
     private final Lock streamLock = new ReentrantLock();
 
-    FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
-        HFileContext fileContext, ByteBuffAllocator allocator) throws IOException {
-      this.fileSize = fileSize;
-      this.hfs = hfs;
-      if (path != null) {
-        this.pathName = path.toString();
+    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext,
+        ByteBuffAllocator allocator) throws IOException {
+      this.fileSize = readerContext.getFileSize();
+      this.hfs = readerContext.getFileSystem();
+      if (readerContext.getFilePath() != null) {
+        this.pathName = readerContext.getFilePath().toString();
       }
       this.fileContext = fileContext;
       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
       this.allocator = allocator;
 
-      this.streamWrapper = stream;
+      this.streamWrapper = readerContext.getInputStreamWrapper();
       // Older versions of HBase didn't support checksum.
       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
       encodedBlockDecodingCtx = defaultDecodingCtx;
     }
 
-    /**
-     * A constructor that reads files with the latest minor version. This is used by unit tests
-     * only.
-     */
-    FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext,
-        ByteBuffAllocator allocator) throws IOException {
-      this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext, allocator);
-    }
-
     @Override
     public BlockIterator blockRange(final long startOffset, final long endOffset) {
       final FSReader owner = this; // handle for inner class
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 8396192..b38964e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -121,12 +121,6 @@ public class HFileBlockIndex {
 
     private byte[][] blockKeys;
 
-    public ByteArrayKeyBlockIndexReader(final int treeLevel,
-        final CachingBlockReader cachingBlockReader) {
-      this(treeLevel);
-      this.cachingBlockReader = cachingBlockReader;
-    }
-
     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
       // Can be null for METAINDEX block
       searchTreeLevel = treeLevel;
@@ -164,13 +158,14 @@ public class HFileBlockIndex {
     @Override
     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
         boolean cacheBlocks, boolean pread, boolean isCompaction,
-        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
+        DataBlockEncoding expectedDataBlockEncoding,
+        CachingBlockReader cachingBlockReader) throws IOException {
       // this would not be needed
       return null;
     }
 
     @Override
-    public Cell midkey() throws IOException {
+    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
       // Not needed here
       return null;
     }
@@ -229,7 +224,6 @@ public class HFileBlockIndex {
       }
       return sb.toString();
     }
-
   }
 
   /**
@@ -237,7 +231,7 @@ public class HFileBlockIndex {
    * part of a cell like the Data block index or the ROW_COL bloom blocks
    * This needs a comparator to work with the Cells
    */
-   static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
+  static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
 
     private Cell[] blockKeys;
     /** Pre-computed mid-key */
@@ -245,12 +239,6 @@ public class HFileBlockIndex {
     /** Needed doing lookup on blocks. */
     private CellComparator comparator;
 
-    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
-        final CachingBlockReader cachingBlockReader) {
-      this(c, treeLevel);
-      this.cachingBlockReader = cachingBlockReader;
-    }
-
     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
       // Can be null for METAINDEX block
       comparator = c;
@@ -290,7 +278,8 @@ public class HFileBlockIndex {
     @Override
     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
         boolean cacheBlocks, boolean pread, boolean isCompaction,
-        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
+        DataBlockEncoding expectedDataBlockEncoding,
+        CachingBlockReader cachingBlockReader) throws IOException {
       int rootLevelIndex = rootBlockContainingKey(key);
       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
         return null;
@@ -406,7 +395,7 @@ public class HFileBlockIndex {
     }
 
     @Override
-    public Cell midkey() throws IOException {
+    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
       if (rootCount == 0)
         throw new IOException("HFile empty");
 
@@ -512,7 +501,8 @@ public class HFileBlockIndex {
       return sb.toString();
     }
   }
-   /**
+
+  /**
    * The reader will always hold the root level index in the memory. Index
    * blocks at all other levels will be cached in the LRU cache in practice,
    * although this API does not enforce that.
@@ -522,7 +512,7 @@ public class HFileBlockIndex {
    * This allows us to do binary search for the entry corresponding to the
    * given key without having to deserialize the block.
    */
-   static abstract class BlockIndexReader implements HeapSize {
+  static abstract class BlockIndexReader implements HeapSize {
 
     protected long[] blockOffsets;
     protected int[] blockDataSizes;
@@ -539,9 +529,6 @@ public class HFileBlockIndex {
      */
     protected int searchTreeLevel;
 
-    /** A way to read {@link HFile} blocks at a given offset */
-    protected CachingBlockReader cachingBlockReader;
-
     /**
      * @return true if the block index is empty.
      */
@@ -573,10 +560,10 @@ public class HFileBlockIndex {
      * @throws IOException
      */
     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
-        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
-        throws IOException {
+        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding,
+        CachingBlockReader cachingBlockReader) throws IOException {
       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
-        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
+        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding, cachingBlockReader);
       if (blockWithScanInfo == null) {
         return null;
       } else {
@@ -600,7 +587,8 @@ public class HFileBlockIndex {
      */
     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
         boolean cacheBlocks, boolean pread, boolean isCompaction,
-        DataBlockEncoding expectedDataBlockEncoding) throws IOException;
+        DataBlockEncoding expectedDataBlockEncoding,
+        CachingBlockReader cachingBlockReader) throws IOException;
 
     /**
      * An approximation to the {@link HFile}'s mid-key. Operates on block
@@ -609,7 +597,7 @@ public class HFileBlockIndex {
      *
      * @return the first key of the middle block
      */
-    public abstract Cell midkey() throws IOException;
+    public abstract Cell midkey(CachingBlockReader cachingBlockReader) throws IOException;
 
     /**
      * @param i from 0 to {@link #getRootBlockCount() - 1}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
index d27da61..347b1f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -27,8 +26,8 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Do different kinds of data block encoding according to column family
@@ -47,7 +46,7 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
   }
 
   public static HFileDataBlockEncoder createFromFileInfo(
-      FileInfo fileInfo) throws IOException {
+      HFileInfo fileInfo) throws IOException {
     DataBlockEncoding encoding = DataBlockEncoding.NONE;
     byte[] dataBlockEncodingType = fileInfo.get(DATA_BLOCK_ENCODING);
     if (dataBlockEncodingType != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
new file mode 100644
index 0000000..a75aea3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
@@ -0,0 +1,503 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.SequenceInputStream;
+import java.security.Key;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HFileProtos;
+
+/**
+ * Metadata for HFile. Conjured by the writer. Read in by the reader.
+ */
+@InterfaceAudience.Private
+public class HFileInfo implements SortedMap<byte[], byte[]> {
+  static final String RESERVED_PREFIX = "hfile.";
+  static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
+  static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
+  static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
+  static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
+  static final byte [] CREATE_TIME_TS = Bytes.toBytes(RESERVED_PREFIX + "CREATE_TIME_TS");
+  static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
+  static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
+  public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
+  private final SortedMap<byte [], byte []> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * We can read files whose major version is v2 IFF their minor version is at least 3.
+   */
+  private static final int MIN_V2_MINOR_VERSION_WITH_PB = 3;
+
+  /** Maximum minor version supported by this HFile format */
+  // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+  // the file. This version can read Writables version 1.
+  static final int MAX_MINOR_VERSION = 3;
+
+  /** Last key in the file. Filled in when we read in the file info */
+  private Cell lastKeyCell = null;
+  /** Average key length read from file info */
+  private int avgKeyLen = -1;
+  /** Average value length read from file info */
+  private int avgValueLen = -1;
+  private boolean includesMemstoreTS = false;
+  private boolean decodeMemstoreTS = false;
+
+  /**
+   * Blocks read from the load-on-open section, excluding data root index, meta
+   * index, and file info.
+   */
+  private List<HFileBlock> loadOnOpenBlocks = new ArrayList<>();
+
+  /**
+   * The iterator will track all blocks in load-on-open section, since we use the
+   * {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} to manage the ByteBuffers in block now,
+   * so we must ensure that deallocate all ByteBuffers in the end.
+   */
+  private HFileBlock.BlockIterator blockIter;
+
+  private HFileBlockIndex.CellBasedKeyBlockIndexReader dataIndexReader;
+  private HFileBlockIndex.ByteArrayKeyBlockIndexReader metaIndexReader;
+
+  private FixedFileTrailer trailer;
+  private HFileContext hfileContext;
+
+  public HFileInfo() {
+    super();
+  }
+
+  public HFileInfo(ReaderContext context, Configuration conf) throws IOException {
+    this.initTrailerAndContext(context, conf);
+  }
+
+  /**
+   * Append the given key/value pair to the file info, optionally checking the
+   * key prefix.
+   *
+   * @param k key to add
+   * @param v value to add
+   * @param checkPrefix whether to check that the provided key does not start
+   *          with the reserved prefix
+   * @return this file info object
+   * @throws IOException if the key or value is invalid
+   */
+  public HFileInfo append(final byte[] k, final byte[] v,
+      final boolean checkPrefix) throws IOException {
+    if (k == null || v == null) {
+      throw new NullPointerException("Key nor value may be null");
+    }
+    if (checkPrefix && isReservedFileInfoKey(k)) {
+      throw new IOException("Keys with a " + HFileInfo.RESERVED_PREFIX
+          + " are reserved");
+    }
+    put(k, v);
+    return this;
+  }
+
+  /** Return true if the given file info key is reserved for internal use. */
+  public static boolean isReservedFileInfoKey(byte[] key) {
+    return Bytes.startsWith(key, HFileInfo.RESERVED_PREFIX_BYTES);
+  }
+
+  @Override
+  public void clear() {
+    this.map.clear();
+  }
+
+  @Override
+  public Comparator<? super byte[]> comparator() {
+    return map.comparator();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return map.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    return map.containsValue(value);
+  }
+
+  @Override
+  public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
+    return map.entrySet();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return map.equals(o);
+  }
+
+  @Override
+  public byte[] firstKey() {
+    return map.firstKey();
+  }
+
+  @Override
+  public byte[] get(Object key) {
+    return map.get(key);
+  }
+
+  @Override
+  public int hashCode() {
+    return map.hashCode();
+  }
+
+  @Override
+  public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
+    return this.map.headMap(toKey);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  @Override
+  public Set<byte[]> keySet() {
+    return map.keySet();
+  }
+
+  @Override
+  public byte[] lastKey() {
+    return map.lastKey();
+  }
+
+  @Override
+  public byte[] put(byte[] key, byte[] value) {
+    return this.map.put(key, value);
+  }
+
+  @Override
+  public void putAll(Map<? extends byte[], ? extends byte[]> m) {
+    this.map.putAll(m);
+  }
+
+  @Override
+  public byte[] remove(Object key) {
+    return this.map.remove(key);
+  }
+
+  @Override
+  public int size() {
+    return map.size();
+  }
+
+  @Override
+  public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
+    return this.map.subMap(fromKey, toKey);
+  }
+
+  @Override
+  public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
+    return this.map.tailMap(fromKey);
+  }
+
+  @Override
+  public Collection<byte[]> values() {
+    return map.values();
+  }
+
+  /**
+   * Write out this instance on the passed in <code>out</code> stream.
+   * We write it as a protobuf.
+   * @see #read(DataInputStream)
+   */
+  void write(final DataOutputStream out) throws IOException {
+    HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
+    for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
+      HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
+      bbpBuilder.setFirst(UnsafeByteOperations.unsafeWrap(e.getKey()));
+      bbpBuilder.setSecond(UnsafeByteOperations.unsafeWrap(e.getValue()));
+      builder.addMapEntry(bbpBuilder.build());
+    }
+    out.write(ProtobufMagic.PB_MAGIC);
+    builder.build().writeDelimitedTo(out);
+  }
+
+  /**
+   * Populate this instance with what we find on the passed in <code>in</code> stream.
+   * Can deserialize protobuf of old Writables format.
+   * @see #write(DataOutputStream)
+   */
+  void read(final DataInputStream in) throws IOException {
+    // This code is tested over in TestHFileReaderV1 where we read an old hfile w/ this new code.
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    byte [] pbuf = new byte[pblen];
+    if (in.markSupported()) {
+      in.mark(pblen);
+    }
+    int read = in.read(pbuf);
+    if (read != pblen) {
+      throw new IOException("read=" + read + ", wanted=" + pblen);
+    }
+    if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
+      parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
+    } else {
+      if (in.markSupported()) {
+        in.reset();
+        parseWritable(in);
+      } else {
+        // We cannot use BufferedInputStream, it consumes more than we read from the underlying IS
+        ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
+        SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams
+        // TODO: Am I leaking anything here wrapping the passed in stream?  We are not calling
+        // close on the wrapped streams but they should be let go after we leave this context?
+        // I see that we keep a reference to the passed in inputstream but since we no longer
+        // have a reference to this after we leave, we should be ok.
+        parseWritable(new DataInputStream(sis));
+      }
+    }
+  }
+
+  /**
+   * Now parse the old Writable format.  It was a list of Map entries.  Each map entry was a
+   * key and a value of a byte [].  The old map format had a byte before each entry that held
+   * a code which was short for the key or value type.  We know it was a byte [] so in below
+   * we just read and dump it.
+   */
+  void parseWritable(final DataInputStream in) throws IOException {
+    // First clear the map.
+    // Otherwise we will just accumulate entries every time this method is called.
+    this.map.clear();
+    // Read the number of entries in the map
+    int entries = in.readInt();
+    // Then read each key/value pair
+    for (int i = 0; i < entries; i++) {
+      byte [] key = Bytes.readByteArray(in);
+      // We used to read a byte that encoded the class type.
+      // Read and ignore it because it is always byte [] in hfile
+      in.readByte();
+      byte [] value = Bytes.readByteArray(in);
+      this.map.put(key, value);
+    }
+  }
+
+  /**
+   * Fill our map with content of the pb we read off disk
+   * @param fip protobuf message to read
+   */
+  void parsePB(final HFileProtos.FileInfoProto fip) {
+    this.map.clear();
+    for (BytesBytesPair pair: fip.getMapEntryList()) {
+      this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+    }
+  }
+
+  public void initTrailerAndContext(ReaderContext context, Configuration conf) throws IOException {
+    try {
+      boolean isHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
+      trailer = FixedFileTrailer.readFromStream(context.getInputStreamWrapper()
+          .getStream(isHBaseChecksum), context.getFileSize());
+      Path path = context.getFilePath();
+      checkFileVersion(path);
+      this.hfileContext = createHFileContext(path, trailer, conf);
+    } catch (Throwable t) {
+      context.getInputStreamWrapper().unbuffer();
+      IOUtils.closeQuietly(context.getInputStreamWrapper());
+      throw new CorruptHFileException("Problem reading HFile Trailer from file "
+          + context.getFilePath(), t);
+    }
+  }
+
+  /**
+   * should be called after initTrailerAndContext
+   */
+  public void initMetaAndIndex(HFile.Reader reader) throws IOException {
+    ReaderContext context = reader.getContext();
+    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
+    // Initialize an block iterator, and parse load-on-open blocks in the following.
+    blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+        context.getFileSize() - trailer.getTrailerSize());
+    // Data index. We also read statistics about the block index written after
+    // the root level.
+    this.dataIndexReader = new HFileBlockIndex
+        .CellBasedKeyBlockIndexReader(trailer.createComparator(), trailer.getNumDataIndexLevels());
+    dataIndexReader.readMultiLevelIndexRoot(blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
+        trailer.getDataIndexCount());
+    reader.setDataBlockIndexReader(dataIndexReader);
+    // Meta index.
+    this.metaIndexReader = new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
+    metaIndexReader.readRootIndex(blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
+        trailer.getMetaIndexCount());
+    reader.setMetaBlockIndexReader(metaIndexReader);
+    loadMetaInfo(blockIter, hfileContext);
+    reader.setDataBlockEncoder(HFileDataBlockEncoderImpl.createFromFileInfo(this));
+    // Load-On-Open info
+    HFileBlock b;
+    while ((b = blockIter.nextBlock()) != null) {
+      loadOnOpenBlocks.add(b);
+    }
+  }
+
+  private HFileContext createHFileContext(Path path,
+      FixedFileTrailer trailer, Configuration conf) throws IOException {
+    HFileContextBuilder builder = new HFileContextBuilder()
+      .withHBaseCheckSum(true)
+      .withHFileName(path.getName())
+      .withCompression(trailer.getCompressionCodec());
+    // Check for any key material available
+    byte[] keyBytes = trailer.getEncryptionKey();
+    if (keyBytes != null) {
+      Encryption.Context cryptoContext = Encryption.newContext(conf);
+      Key key = EncryptionUtil.unwrapKey(conf, keyBytes);
+      // Use the algorithm the key wants
+      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
+      if (cipher == null) {
+        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"
+            + ", path=" + path);
+      }
+      cryptoContext.setCipher(cipher);
+      cryptoContext.setKey(key);
+      builder.withEncryptionContext(cryptoContext);
+    }
+    HFileContext context = builder.build();
+    return context;
+  }
+
+  private void loadMetaInfo(HFileBlock.BlockIterator blockIter, HFileContext hfileContext)
+      throws IOException {
+    read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
+    byte[] creationTimeBytes = get(HFileInfo.CREATE_TIME_TS);
+    hfileContext.setFileCreateTime(creationTimeBytes == null ?
+        0 : Bytes.toLong(creationTimeBytes));
+    byte[] tmp = get(HFileInfo.MAX_TAGS_LEN);
+    // max tag length is not present in the HFile means tags were not at all written to file.
+    if (tmp != null) {
+      hfileContext.setIncludesTags(true);
+      tmp = get(HFileInfo.TAGS_COMPRESSED);
+      if (tmp != null && Bytes.toBoolean(tmp)) {
+        hfileContext.setCompressTags(true);
+      }
+    }
+    // parse meta info
+    if (get(HFileInfo.LASTKEY) != null) {
+      lastKeyCell = new KeyValue.KeyOnlyKeyValue(get(HFileInfo.LASTKEY));
+    }
+    avgKeyLen = Bytes.toInt(get(HFileInfo.AVG_KEY_LEN));
+    avgValueLen = Bytes.toInt(get(HFileInfo.AVG_VALUE_LEN));
+    byte [] keyValueFormatVersion = get(HFileWriterImpl.KEY_VALUE_VERSION);
+    includesMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) == HFileWriterImpl.KEY_VALUE_VER_WITH_MEMSTORE;
+    hfileContext.setIncludesMvcc(includesMemstoreTS);
+    if (includesMemstoreTS) {
+      decodeMemstoreTS = Bytes.toLong(get(HFileWriterImpl.MAX_MEMSTORE_TS_KEY)) > 0;
+    }
+  }
+
+  /**
+   * File version check is a little sloppy. We read v3 files but can also read v2 files if their
+   * content has been pb'd; files written with 0.98.
+   */
+  private void checkFileVersion(Path path) {
+    int majorVersion = trailer.getMajorVersion();
+    if (majorVersion == getMajorVersion()) {
+      return;
+    }
+    int minorVersion = trailer.getMinorVersion();
+    if (majorVersion == 2 && minorVersion >= MIN_V2_MINOR_VERSION_WITH_PB) {
+      return;
+    }
+    // We can read v3 or v2 versions of hfile.
+    throw new IllegalArgumentException("Invalid HFile version: major=" +
+      trailer.getMajorVersion() + ", minor=" + trailer.getMinorVersion() + ": expected at least " +
+      "major=2 and minor=" + MAX_MINOR_VERSION + ", path=" + path);
+  }
+
+  public void close() {
+    if (blockIter != null) {
+      blockIter.freeBlocks();
+    }
+  }
+
+  public int getMajorVersion() {
+    return 3;
+  }
+
+  public void setTrailer(FixedFileTrailer trailer) {
+    this.trailer = trailer;
+  }
+
+  public FixedFileTrailer getTrailer() {
+    return this.trailer;
+  }
+
+  public HFileBlockIndex.CellBasedKeyBlockIndexReader getDataBlockIndexReader() {
+    return this.dataIndexReader;
+  }
+
+  public HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader() {
+    return this.metaIndexReader;
+  }
+
+  public HFileContext getHFileContext() {
+    return this.hfileContext;
+  }
+
+  public List<HFileBlock> getLoadOnOpenBlocks() {
+    return loadOnOpenBlocks;
+  }
+
+  public Cell getLastKeyCell() {
+    return lastKeyCell;
+  }
+
+  public int getAvgKeyLen() {
+    return avgKeyLen;
+  }
+
+  public int getAvgValueLen() {
+    return avgValueLen;
+  }
+
+  public boolean shouldIncludeMemStoreTS() {
+    return includesMemstoreTS;
+  }
+
+  public boolean isDecodeMemstoreTS() {
+    return decodeMemstoreTS;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
new file mode 100644
index 0000000..98fe885
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HFile.Reader} to deal with pread.
+ */
+@InterfaceAudience.Private
+public class HFilePreadReader extends HFileReaderImpl {
+  private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class);
+
+  public HFilePreadReader(ReaderContext context, HFileInfo fileInfo,
+      CacheConfig cacheConf, Configuration conf) throws IOException {
+    super(context, fileInfo, cacheConf, conf);
+    // Prefetch file blocks upon open if requested
+    if (cacheConf.shouldPrefetchOnOpen()) {
+      PrefetchExecutor.request(path, new Runnable() {
+        @Override
+        public void run() {
+          long offset = 0;
+          long end = 0;
+          try {
+            end = getTrailer().getLoadOnOpenDataOffset();
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
+            }
+            // Don't use BlockIterator here, because it's designed to read load-on-open section.
+            long onDiskSizeOfNextBlock = -1;
+            while (offset < end) {
+              if (Thread.interrupted()) {
+                break;
+              }
+              // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
+              // the internal-to-hfileblock thread local which holds the overread that gets the
+              // next header, will not have happened...so, pass in the onDiskSize gotten from the
+              // cached block. This 'optimization' triggers extremely rarely I'd say.
+              HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
+                /* pread= */true, false, false, null, null);
+              try {
+                onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
+                offset += block.getOnDiskSizeWithHeader();
+              } finally {
+                // Ideally here the readBlock won't find the block in cache. We call this
+                // readBlock so that block data is read from FS and cached in BC. we must call
+                // returnBlock here to decrease the reference count of block.
+                block.release();
+              }
+            }
+          } catch (IOException e) {
+            // IOExceptions are probably due to region closes (relocation, etc.)
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
+            }
+          } catch (NullPointerException e) {
+            LOG.warn("Stream moved/closed or prefetch cancelled?" +
+                getPathOffsetEndStr(path, offset, end), e);
+          } catch (Exception e) {
+            // Other exceptions are interesting
+            LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
+          } finally {
+            PrefetchExecutor.complete(path);
+          }
+        }
+      });
+    }
+  }
+
+  private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
+    return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
+  }
+
+  public void close(boolean evictOnClose) throws IOException {
+    PrefetchExecutor.cancel(path);
+    // Deallocate blocks in load-on-open section
+    this.fileInfo.close();
+    // Deallocate data blocks
+    cacheConf.getBlockCache().ifPresent(cache -> {
+      if (evictOnClose) {
+        int numEvicted = cache.evictBlocksByHfileName(name);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
+        }
+      }
+    });
+    fsBlockReader.closeStreams();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
index 98154f9..88f73f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
@@ -20,6 +20,17 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import static com.codahale.metrics.MetricRegistry.name;
 
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.IOException;
@@ -55,7 +66,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
@@ -67,6 +77,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -81,17 +92,6 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser;
 
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-
 /**
  * Implements pretty-printing functionality for {@link HFile}s.
  */
@@ -315,7 +315,7 @@ public class HFilePrettyPrinter extends Configured implements Tool {
 
     HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, getConf());
 
-    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+    Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
 
     KeyValueStatsCollector fileStats = null;
 
@@ -539,22 +539,22 @@ public class HFilePrettyPrinter extends Configured implements Tool {
           || Bytes.equals(e.getKey(), HStoreFile.DELETE_FAMILY_COUNT)
           || Bytes.equals(e.getKey(), HStoreFile.EARLIEST_PUT_TS)
           || Bytes.equals(e.getKey(), HFileWriterImpl.MAX_MEMSTORE_TS_KEY)
-          || Bytes.equals(e.getKey(), FileInfo.CREATE_TIME_TS)
+          || Bytes.equals(e.getKey(), HFileInfo.CREATE_TIME_TS)
           || Bytes.equals(e.getKey(), HStoreFile.BULKLOAD_TIME_KEY)) {
         out.println(Bytes.toLong(e.getValue()));
       } else if (Bytes.equals(e.getKey(), HStoreFile.TIMERANGE_KEY)) {
         TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(e.getValue());
         out.println(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
-      } else if (Bytes.equals(e.getKey(), FileInfo.AVG_KEY_LEN)
-          || Bytes.equals(e.getKey(), FileInfo.AVG_VALUE_LEN)
+      } else if (Bytes.equals(e.getKey(), HFileInfo.AVG_KEY_LEN)
+          || Bytes.equals(e.getKey(), HFileInfo.AVG_VALUE_LEN)
           || Bytes.equals(e.getKey(), HFileWriterImpl.KEY_VALUE_VERSION)
-          || Bytes.equals(e.getKey(), FileInfo.MAX_TAGS_LEN)) {
+          || Bytes.equals(e.getKey(), HFileInfo.MAX_TAGS_LEN)) {
         out.println(Bytes.toInt(e.getValue()));
       } else if (Bytes.equals(e.getKey(), HStoreFile.MAJOR_COMPACTION_KEY)
-          || Bytes.equals(e.getKey(), FileInfo.TAGS_COMPRESSED)
+          || Bytes.equals(e.getKey(), HFileInfo.TAGS_COMPRESSED)
           || Bytes.equals(e.getKey(), HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY)) {
         out.println(Bytes.toBoolean(e.getValue()));
-      } else if (Bytes.equals(e.getKey(), FileInfo.LASTKEY)) {
+      } else if (Bytes.equals(e.getKey(), HFileInfo.LASTKEY)) {
         out.println(new KeyValue.KeyOnlyKeyValue(e.getValue()).toString());
       } else {
         out.println(Bytes.toStringBinary(e.getValue()));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index cc1f658..1b2a1d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -20,47 +20,40 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.Key;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ByteBufferKeyValue;
 import org.apache.hadoop.hbase.SizeCachedKeyValue;
 import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
-import org.apache.hadoop.hbase.trace.TraceUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.hadoop.io.WritableUtils;
+
 import org.apache.htrace.core.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -69,7 +62,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  */
 @InterfaceAudience.Private
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-public class HFileReaderImpl implements HFile.Reader, Configurable {
+public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
   // This class is HFileReaderV3 + HFileReaderV2 + AbstractHFileReader all squashed together into
   // one file.  Ditto for all the HFileReader.ScannerV? implementations. I was running up against
   // the MaxInlineLevel limit because too many tiers involved reading from an hfile. Was also hard
@@ -77,15 +70,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class);
 
   /** Data block index reader keeping the root data index in memory */
-  private HFileBlockIndex.CellBasedKeyBlockIndexReader dataBlockIndexReader;
+  protected HFileBlockIndex.CellBasedKeyBlockIndexReader dataBlockIndexReader;
 
   /** Meta block index reader -- always single level */
-  private HFileBlockIndex.ByteArrayKeyBlockIndexReader metaBlockIndexReader;
+  protected HFileBlockIndex.ByteArrayKeyBlockIndexReader metaBlockIndexReader;
 
-  private final FixedFileTrailer trailer;
-
-  /** Filled when we read in the trailer. */
-  private final Compression.Algorithm compressAlgo;
+  protected FixedFileTrailer trailer;
 
   private final boolean primaryReplicaReader;
 
@@ -93,40 +83,30 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    * What kind of data block encoding should be used while reading, writing,
    * and handling cache.
    */
-  private HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE;
-
-  /** Last key in the file. Filled in when we read in the file info */
-  private Cell lastKeyCell = null;
-
-  /** Average key length read from file info */
-  private int avgKeyLen = -1;
-
-  /** Average value length read from file info */
-  private int avgValueLen = -1;
+  protected HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE;
 
   /** Key comparator */
-  private CellComparator comparator = CellComparator.getInstance();
-
-  /** Size of this file. */
-  private final long fileSize;
+  protected CellComparator comparator = CellComparator.getInstance();
 
   /** Block cache configuration. */
-  private final CacheConfig cacheConf;
+  protected final CacheConfig cacheConf;
+
+  protected ReaderContext context;
+
+  protected final HFileInfo fileInfo;
 
   /** Path of file */
-  private final Path path;
+  protected final Path path;
 
   /** File name to be used for block names */
-  private final String name;
-
-  private FileInfo fileInfo;
+  protected final String name;
 
   private Configuration conf;
 
-  private HFileContext hfileContext;
+  protected HFileContext hfileContext;
 
   /** Filesystem-level block reader. */
-  private HFileBlock.FSReader fsBlockReader;
+  protected HFileBlock.FSReader fsBlockReader;
 
   /**
    * A "sparse lock" implementation allowing to lock on a particular block
@@ -136,19 +116,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    */
   private IdLock offsetLock = new IdLock();
 
-  /**
-   * The iterator will track all blocks in load-on-open section, since we use the
-   * {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} to manage the ByteBuffers in block now, so
-   * we must ensure that deallocate all ByteBuffers in the end.
-   */
-  private final HFileBlock.BlockIterator blockIter;
-
-  /**
-   * Blocks read from the load-on-open section, excluding data root index, meta
-   * index, and file info.
-   */
-  private List<HFileBlock> loadOnOpenBlocks = new ArrayList<>();
-
   /** Minimum minor version supported by this HFile format */
   static final int MIN_MINOR_VERSION = 0;
 
@@ -157,187 +124,36 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   // the file. This version can read Writables version 1.
   static final int MAX_MINOR_VERSION = 3;
 
-  /**
-   * We can read files whose major version is v2 IFF their minor version is at least 3.
-   */
-  private static final int MIN_V2_MINOR_VERSION_WITH_PB = 3;
-
   /** Minor versions starting with this number have faked index key */
   static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
 
-  @VisibleForTesting
-  @Deprecated
-  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
-      long fileSize, CacheConfig cacheConf, HFileSystem hfs, Configuration conf)
-      throws IOException {
-    this(path, trailer, fsdis, fileSize, cacheConf, hfs, true, conf);
-  }
-
   /**
-   * Opens a HFile. You must load the index before you can use it by calling
-   * {@link #loadFileInfo()}.
-   * @param path
-   *          Path to HFile.
-   * @param trailer
-   *          File trailer.
-   * @param fsdis
-   *          input stream.
-   * @param fileSize
-   *          Length of the stream.
-   * @param cacheConf
-   *          Cache configuration.
-   * @param hfs
-   *          The file system.
-   * @param conf
-   *          Configuration
+   * Opens a HFile.
+   * @param context Reader context info
+   * @param fileInfo HFile info
+   * @param cacheConf Cache configuration.
+   * @param conf Configuration
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-  public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis,
-      long fileSize, CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader,
+  public HFileReaderImpl(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
       Configuration conf) throws IOException {
-    this.trailer = trailer;
-    this.compressAlgo = trailer.getCompressionCodec();
     this.cacheConf = cacheConf;
-    this.fileSize = fileSize;
-    this.path = path;
+    this.context = context;
+    this.path = context.getFilePath();
     this.name = path.getName();
     this.conf = conf;
-    this.primaryReplicaReader = primaryReplicaReader;
-    checkFileVersion();
-    this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
-    this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext,
-        cacheConf.getByteBuffAllocator());
-
+    this.primaryReplicaReader = context.isPrimaryReplicaReader();
+    this.fileInfo = fileInfo;
+    this.trailer = fileInfo.getTrailer();
     // Comparator class name is stored in the trailer in version 2.
-    comparator = trailer.createComparator();
-    dataBlockIndexReader = new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator,
-        trailer.getNumDataIndexLevels(), this);
-    metaBlockIndexReader = new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
-
-    // Initialize an block iterator, and parse load-on-open blocks in the following.
-    blockIter = fsBlockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
-      fileSize - trailer.getTrailerSize());
-
-    // Data index. We also read statistics about the block index written after
-    // the root level.
-    dataBlockIndexReader.readMultiLevelIndexRoot(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getDataIndexCount());
-
-    // Meta index.
-    metaBlockIndexReader.readRootIndex(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getMetaIndexCount());
-
-    // File info
-    fileInfo = new FileInfo();
-    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
-    byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
-    this.hfileContext.setFileCreateTime(creationTimeBytes == null?  0:
-        Bytes.toLong(creationTimeBytes));
-    if (fileInfo.get(FileInfo.LASTKEY) != null) {
-      lastKeyCell = new KeyValue.KeyOnlyKeyValue(fileInfo.get(FileInfo.LASTKEY));
-    }
-    avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
-    avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
-    includesMemstoreTS = keyValueFormatVersion != null &&
-        Bytes.toInt(keyValueFormatVersion) == HFileWriterImpl.KEY_VALUE_VER_WITH_MEMSTORE;
-    fsBlockReader.setIncludesMemStoreTS(includesMemstoreTS);
-    if (includesMemstoreTS) {
-      decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterImpl.MAX_MEMSTORE_TS_KEY)) > 0;
-    }
-
-    // Read data block encoding algorithm name from file info.
-    dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
+    this.comparator = trailer.createComparator();
+    this.hfileContext = fileInfo.getHFileContext();
+    this.fsBlockReader = new HFileBlock.FSReaderImpl(context, hfileContext,
+        cacheConf.getByteBuffAllocator());
+    this.dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
     fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
-
-    // Store all other load-on-open blocks for further consumption.
-    HFileBlock b;
-    while ((b = blockIter.nextBlock()) != null) {
-      loadOnOpenBlocks.add(b);
-    }
-
-    // Prefetch file blocks upon open if requested
-    if (cacheConf.shouldPrefetchOnOpen()) {
-      PrefetchExecutor.request(path, new Runnable() {
-        @Override
-        public void run() {
-          long offset = 0;
-          long end = 0;
-          try {
-            end = getTrailer().getLoadOnOpenDataOffset();
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
-            }
-            // Don't use BlockIterator here, because it's designed to read load-on-open section.
-            long onDiskSizeOfNextBlock = -1;
-            while (offset < end) {
-              if (Thread.interrupted()) {
-                break;
-              }
-              // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
-              // the internal-to-hfileblock thread local which holds the overread that gets the
-              // next header, will not have happened...so, pass in the onDiskSize gotten from the
-              // cached block. This 'optimization' triggers extremely rarely I'd say.
-              HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
-                /* pread= */true, false, false, null, null);
-              try {
-                onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
-                offset += block.getOnDiskSizeWithHeader();
-              } finally {
-                // Ideally here the readBlock won't find the block in cache. We call this
-                // readBlock so that block data is read from FS and cached in BC. we must call
-                // returnBlock here to decrease the reference count of block.
-                block.release();
-              }
-            }
-          } catch (IOException e) {
-            // IOExceptions are probably due to region closes (relocation, etc.)
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
-            }
-          } catch (NullPointerException e) {
-            LOG.warn("Stream moved/closed or prefetch cancelled?" +
-                getPathOffsetEndStr(path, offset, end), e);
-          } catch (Exception e) {
-            // Other exceptions are interesting
-            LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
-          } finally {
-            PrefetchExecutor.complete(path);
-          }
-        }
-      });
-    }
-
-    byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
-    // max tag length is not present in the HFile means tags were not at all written to file.
-    if (tmp != null) {
-      hfileContext.setIncludesTags(true);
-      tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
-      if (tmp != null && Bytes.toBoolean(tmp)) {
-        hfileContext.setCompressTags(true);
-      }
-    }
-  }
-
-  private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
-    return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
-  }
-
-  /**
-   * File version check is a little sloppy. We read v3 files but can also read v2 files if their
-   * content has been pb'd; files written with 0.98.
-   */
-  private void checkFileVersion() {
-    int majorVersion = trailer.getMajorVersion();
-    if (majorVersion == getMajorVersion()) return;
-    int minorVersion = trailer.getMinorVersion();
-    if (majorVersion == 2 && minorVersion >= MIN_V2_MINOR_VERSION_WITH_PB) return;
-    // We can read v3 or v2 versions of hfile.
-    throw new IllegalArgumentException("Invalid HFile version: major=" +
-      trailer.getMajorVersion() + ", minor=" + trailer.getMinorVersion() + ": expected at least " +
-      "major=2 and minor=" + MAX_MINOR_VERSION + ", path=" + path);
+    dataBlockIndexReader = fileInfo.getDataBlockIndexReader();
+    metaBlockIndexReader = fileInfo.getMetaBlockIndexReader();
   }
 
   @SuppressWarnings("serial")
@@ -360,19 +176,19 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   public String toString() {
     return "reader=" + path.toString() +
         (!isFileInfoLoaded()? "":
-          ", compression=" + compressAlgo.getName() +
+          ", compression=" + trailer.getCompressionCodec().getName() +
           ", cacheConf=" + cacheConf +
           ", firstKey=" + toStringFirstKey() +
           ", lastKey=" + toStringLastKey()) +
-          ", avgKeyLen=" + avgKeyLen +
-          ", avgValueLen=" + avgValueLen +
+          ", avgKeyLen=" + fileInfo.getAvgKeyLen() +
+          ", avgValueLen=" + fileInfo.getAvgValueLen() +
           ", entries=" + trailer.getEntryCount() +
-          ", length=" + fileSize;
+          ", length=" + context.getFileSize();
   }
 
   @Override
   public long length() {
-    return fileSize;
+    return context.getFileSize();
   }
 
   /**
@@ -425,10 +241,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     return comparator;
   }
 
-  /** @return compression algorithm */
-  @Override
+  @VisibleForTesting
   public Compression.Algorithm getCompressionAlgorithm() {
-    return compressAlgo;
+    return trailer.getCompressionCodec();
   }
 
   /**
@@ -448,23 +263,49 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   }
 
   @Override
-  public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
+  public void setDataBlockEncoder(HFileDataBlockEncoder dataBlockEncoder) {
+    this.dataBlockEncoder = dataBlockEncoder;
+    this.fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
+  }
+
+  @Override
+  public void setDataBlockIndexReader(HFileBlockIndex.CellBasedKeyBlockIndexReader reader) {
+    this.dataBlockIndexReader = reader;
+  }
+
+  @Override
+  public HFileBlockIndex.CellBasedKeyBlockIndexReader getDataBlockIndexReader() {
     return dataBlockIndexReader;
   }
 
   @Override
+  public void setMetaBlockIndexReader(HFileBlockIndex.ByteArrayKeyBlockIndexReader reader) {
+    this.metaBlockIndexReader = reader;
+  }
+
+  @Override
+  public HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader() {
+    return metaBlockIndexReader;
+  }
+
+  @Override
   public FixedFileTrailer getTrailer() {
     return trailer;
   }
 
   @Override
-  public boolean isPrimaryReplicaReader() {
-    return primaryReplicaReader;
+  public ReaderContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public HFileInfo getHFileInfo() {
+    return this.fileInfo;
   }
 
   @Override
-  public FileInfo loadFileInfo() throws IOException {
-    return fileInfo;
+  public boolean isPrimaryReplicaReader() {
+    return primaryReplicaReader;
   }
 
   /**
@@ -639,8 +480,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      */
     protected void readMvccVersion(final int offsetFromPos) {
       // See if we even need to decode mvcc.
-      if (!this.reader.shouldIncludeMemStoreTS()) return;
-      if (!this.reader.isDecodeMemStoreTS()) {
+      if (!this.reader.getHFileInfo().shouldIncludeMemStoreTS()) {
+        return;
+      }
+      if (!this.reader.getHFileInfo().isDecodeMemstoreTS()) {
         currMemstoreTS = 0;
         currMemstoreTSLen = 1;
         return;
@@ -738,7 +581,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           // add the two bytes read for the tags.
           offsetFromPos += tlen + (Bytes.SIZEOF_SHORT);
         }
-        if (this.reader.shouldIncludeMemStoreTS()) {
+        if (this.reader.getHFileInfo().shouldIncludeMemStoreTS()) {
           // Directly read the mvcc based on current position
           readMvccVersion(offsetFromPos);
         }
@@ -843,7 +686,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     public int seekTo(Cell key, boolean rewind) throws IOException {
       HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
       BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock,
-        cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
+          cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding(), reader);
       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
         // This happens if the key e.g. falls before the beginning of the file.
         return -1;
@@ -855,7 +698,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     @Override
     public boolean seekBefore(Cell key) throws IOException {
       HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock,
-        cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
+          cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction),
+          reader);
       if (seekToBlock == null) {
         return false;
       }
@@ -949,7 +793,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       Cell ret;
       int cellBufSize = getKVBufSize();
       long seqId = 0L;
-      if (this.reader.shouldIncludeMemStoreTS()) {
+      if (this.reader.getHFileInfo().shouldIncludeMemStoreTS()) {
         seqId = currMemstoreTS;
       }
       if (blockBuffer.hasArray()) {
@@ -1275,20 +1119,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    */
   public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
 
-  private boolean includesMemstoreTS = false;
-  protected boolean decodeMemstoreTS = false;
-
-
-  @Override
-  public boolean isDecodeMemStoreTS() {
-    return this.decodeMemstoreTS;
-  }
-
-  @Override
-  public boolean shouldIncludeMemStoreTS() {
-    return includesMemstoreTS;
-  }
-
   /**
    * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType}
    * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
@@ -1548,7 +1378,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
   @Override
   public boolean hasMVCCInfo() {
-    return includesMemstoreTS && decodeMemstoreTS;
+    return fileInfo.shouldIncludeMemStoreTS() && fileInfo.isDecodeMemstoreTS();
   }
 
   /**
@@ -1584,7 +1414,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    */
   @Override
   public Optional<Cell> getLastKey() {
-    return dataBlockIndexReader.isEmpty() ? Optional.empty() : Optional.of(lastKeyCell);
+    return dataBlockIndexReader.isEmpty() ? Optional.empty() :
+        Optional.of(fileInfo.getLastKeyCell());
   }
 
   /**
@@ -1594,7 +1425,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    */
   @Override
   public Optional<Cell> midKey() throws IOException {
-    return Optional.ofNullable(dataBlockIndexReader.midkey());
+    return Optional.ofNullable(dataBlockIndexReader.midkey(this));
   }
 
   @Override
@@ -1603,23 +1434,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   }
 
   @Override
-  public void close(boolean evictOnClose) throws IOException {
-    PrefetchExecutor.cancel(path);
-    // Deallocate blocks in load-on-open section
-    blockIter.freeBlocks();
-    // Deallocate data blocks
-    cacheConf.getBlockCache().ifPresent(cache -> {
-      if (evictOnClose) {
-        int numEvicted = cache.evictBlocksByHfileName(name);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
-        }
-      }
-    });
-    fsBlockReader.closeStreams();
-  }
-
-  @Override
   public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
     return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
   }
@@ -1802,9 +1616,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           " is not supported, path=" + path) ;
     }
 
-    for (HFileBlock b : loadOnOpenBlocks)
-      if (b.getBlockType() == blockType)
+    for (HFileBlock b : fileInfo.getLoadOnOpenBlocks()) {
+      if (b.getBlockType() == blockType) {
         return b.getByteStream();
+      }
+    }
     return null;
   }
 
@@ -1827,43 +1643,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     return PrefetchExecutor.isCompleted(path);
   }
 
-  protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
-      HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
-    HFileContextBuilder builder = new HFileContextBuilder()
-      .withIncludesMvcc(shouldIncludeMemStoreTS())
-      .withHBaseCheckSum(true)
-      .withHFileName(this.getName())
-      .withCompression(this.compressAlgo);
-
-    // Check for any key material available
-    byte[] keyBytes = trailer.getEncryptionKey();
-    if (keyBytes != null) {
-      Encryption.Context cryptoContext = Encryption.newContext(conf);
-      Key key;
-      key = EncryptionUtil.unwrapKey(conf, keyBytes);
-      // Use the algorithm the key wants
-      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
-      if (cipher == null) {
-        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"
-            + ", path=" + path);
-      }
-      cryptoContext.setCipher(cipher);
-      cryptoContext.setKey(key);
-      builder.withEncryptionContext(cryptoContext);
-    }
-
-    HFileContext context = builder.build();
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reader" + (path != null? " for " + path: "") +
-        " initialized with cacheConf: " + cacheConf +
-        " comparator: " + comparator.getClass().getSimpleName() +
-        " fileContext: " + context);
-    }
-
-    return context;
-  }
-
   /**
    * Create a Scanner on this file. No seeks or reads are done on creation. Call
    * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileStreamReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileStreamReader.java
new file mode 100644
index 0000000..3f72b4a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileStreamReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation of {@link HFile.Reader} to deal with stream read
+ * do not perform any prefetch operations (HFilePreadReader will do this).
+ */
+@InterfaceAudience.Private
+public class HFileStreamReader extends HFileReaderImpl {
+  public HFileStreamReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
+      Configuration conf) throws IOException {
+    super(context, fileInfo, cacheConf, conf);
+  }
+
+  @Override
+  public void close(boolean evictOnClose) throws IOException {
+    fsBlockReader.closeStreams();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 93cca8b..26f10ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -34,18 +34,14 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.CellComparatorImpl.MetaCellComparator;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -54,7 +50,9 @@ import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.Writable;
-
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -82,7 +80,7 @@ public class HFileWriterImpl implements HFile.Writer {
   protected final boolean closeOutputStream;
 
   /** A "file info" block: a key-value map of file-wide metadata. */
-  protected FileInfo fileInfo = new HFile.FileInfo();
+  protected HFileInfo fileInfo = new HFileInfo();
 
   /** Total # of key/value entries, i.e. how many times add() was called. */
   protected long entryCount = 0;
@@ -196,7 +194,7 @@ public class HFileWriterImpl implements HFile.Writer {
 
   /**
    * Add to the file info. All added key/value pairs can be obtained using
-   * {@link HFile.Reader#loadFileInfo()}.
+   * {@link HFile.Reader#getHFileInfo()}.
    *
    * @param k Key
    * @param v Value
@@ -791,27 +789,27 @@ public class HFileWriterImpl implements HFile.Writer {
       // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
       // byte buffer. Won't take a tuple.
       byte [] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
-      fileInfo.append(FileInfo.LASTKEY, lastKey, false);
+      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
     }
 
     // Average key length.
     int avgKeyLen =
         entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
-    fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
-    fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
+    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
+    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
       false);
 
     // Average value length.
     int avgValueLen =
         entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
-    fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
+    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
     if (hFileContext.isIncludesTags()) {
       // When tags are not being written in this file, MAX_TAGS_LEN is excluded
       // from the FileInfo
-      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
       boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
         && hFileContext.isCompressTags();
-      fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
+      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java
new file mode 100644
index 0000000..bd3d63d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Carries the information on some of the meta data about the HFile Reader
+ */
+@InterfaceAudience.Private
+public class ReaderContext {
+  @InterfaceAudience.Private
+  public enum ReaderType {
+    PREAD,
+    STREAM
+  }
+  private final Path filePath;
+  private final FSDataInputStreamWrapper fsdis;
+  private final long fileSize;
+  private final HFileSystem hfs;
+  private final boolean primaryReplicaReader;
+  private final ReaderType type;
+
+  public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
+      HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
+    this.filePath = filePath;
+    this.fsdis = fsdis;
+    this.fileSize = fileSize;
+    this.hfs = hfs;
+    this.primaryReplicaReader = primaryReplicaReader;
+    this.type = type;
+  }
+
+  public Path getFilePath() {
+    return this.filePath;
+  }
+
+  public FSDataInputStreamWrapper getInputStreamWrapper() {
+    return this.fsdis;
+  }
+
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  public HFileSystem getFileSystem() {
+    return this.hfs;
+  }
+
+  public boolean isPrimaryReplicaReader() {
+    return this.primaryReplicaReader;
+  }
+
+  public ReaderType getReaderType() {
+    return this.type;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
new file mode 100644
index 0000000..c58d5b8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A builder that helps in building up the ReaderContext
+ */
+@InterfaceAudience.Private
+public class ReaderContextBuilder {
+  private Path filePath;
+  private FSDataInputStreamWrapper fsdis;
+  private long fileSize;
+  private HFileSystem hfs;
+  private boolean primaryReplicaReader = true;
+  private ReaderType type = ReaderType.PREAD;
+
+  public ReaderContextBuilder() {}
+
+  public ReaderContextBuilder withFilePath(Path filePath) {
+    this.filePath = filePath;
+    return this;
+  }
+
+  public ReaderContextBuilder withFileSize(long fileSize) {
+    this.fileSize = fileSize;
+    return this;
+  }
+
+  public ReaderContextBuilder withInputStreamWrapper(FSDataInputStreamWrapper fsdis) {
+    this.fsdis = fsdis;
+    return this;
+  }
+
+  public ReaderContextBuilder withFileSystem(HFileSystem hfs) {
+    this.hfs = hfs;
+    return this;
+  }
+
+  public ReaderContextBuilder withFileSystem(FileSystem fs) {
+    if (!(fs instanceof HFileSystem)) {
+      this.hfs = new HFileSystem(fs);
+    } else {
+      this.hfs = (HFileSystem) fs;
+    }
+    return this;
+  }
+
+  public ReaderContextBuilder withPrimaryReplicaReader(boolean primaryReplicaReader) {
+    this.primaryReplicaReader = primaryReplicaReader;
+    return this;
+  }
+
+  public ReaderContextBuilder withReaderType(ReaderType type) {
+    this.type = type;
+    return this;
+  }
+
+  @VisibleForTesting
+  public ReaderContextBuilder withFileSystemAndPath(FileSystem fs, Path filePath)
+      throws IOException {
+    this.withFileSystem(fs)
+        .withFilePath(filePath)
+        .withFileSize(fs.getFileStatus(filePath).getLen())
+        .withInputStreamWrapper(new FSDataInputStreamWrapper(fs, filePath));
+    return this;
+  }
+
+  public ReaderContext build() {
+    validateFields();
+    return new ReaderContext(filePath, fsdis, fileSize, hfs, primaryReplicaReader, type);
+  }
+
+  private void validateFields() throws IllegalArgumentException {
+    checkNotNull(filePath, "Illegal ReaderContext, no filePath specified.");
+    checkNotNull(fsdis, "Illegal ReaderContext, no StreamWrapper specified.");
+    checkNotNull(hfs, "Illegal ReaderContext, no HFileSystem specified.");
+    checkArgument(fileSize > 0L, "Illegal ReaderContext, fileSize <= 0");
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 41e77f8..0c4fa46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
@@ -63,7 +63,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@@ -599,8 +598,6 @@ public class MergeTableRegionsProcedure
    */
   private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
       final Path mergeDir) throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Configuration conf = env.getMasterConfiguration();
     final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
     for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
       String family = hcd.getNameAsString();
@@ -610,9 +607,8 @@ public class MergeTableRegionsProcedure
           // Create reference file(s) to parent region file here in mergedDir.
           // As this procedure is running on master, use CacheConfig.DISABLED means
           // don't cache any block.
-          regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(mfs.getFileSystem(),
-              storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true),
-            mergeDir);
+          regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(
+              storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), mergeDir);
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index fc20176..1752c46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -621,7 +622,6 @@ public class SplitTableRegionProcedure
    */
   private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
       final HRegionFileSystem regionFs) throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
     final Configuration conf = env.getMasterConfiguration();
     TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
     // The following code sets up a thread pool executor with as many slots as
@@ -685,8 +685,8 @@ public class SplitTableRegionProcedure
           // As this procedure is running on master, use CacheConfig.DISABLED means
           // don't cache any block.
           StoreFileSplitter sfs =
-              new StoreFileSplitter(regionFs, familyName, new HStoreFile(mfs.getFileSystem(),
-                  storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true));
+              new StoreFileSplitter(regionFs, familyName, new HStoreFile(
+                  storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
           futures.add(threadPool.submit(sfs));
         }
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9521cb0..27a5702 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1994,7 +1994,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           continue;
         }
         if (majorCompactionOnly) {
-          byte[] val = reader.loadFileInfo().get(MAJOR_COMPACTION_KEY);
+          byte[] val = reader.getHFileInfo().get(MAJOR_COMPACTION_KEY);
           if (val == null || !Bytes.toBoolean(val)) {
             continue;
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f2ffc4b..acd3173 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -718,14 +718,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
 
   @VisibleForTesting
   protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
-    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
+    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
+        p, isPrimaryReplicaStore());
     return createStoreFileAndReader(info);
   }
 
   private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
     info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
-    HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
-        this.family.getBloomFilterType(), isPrimaryReplicaStore());
+    HStoreFile storeFile = new HStoreFile(info, this.family.getBloomFilterType(), this.cacheConf);
     storeFile.initReader();
     return storeFile;
   }
@@ -810,7 +810,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
       FileSystem srcFs = srcPath.getFileSystem(conf);
       srcFs.access(srcPath, FsAction.READ_WRITE);
       reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf);
-      reader.loadFileInfo();
 
       Optional<byte[]> firstKey = reader.getFirstRowKey();
       Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
@@ -1409,7 +1408,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
   public List<HStoreFile> compact(CompactionContext compaction,
     ThroughputController throughputController, User user) throws IOException {
     assert compaction != null;
-    List<HStoreFile> sfs = null;
     CompactionRequestImpl cr = compaction.getRequest();
     try {
       // Do all sanity checking in here if we have a valid CompactionRequestImpl
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index e306d24..dc7b68c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -28,8 +28,6 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,14 +39,14 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -69,10 +67,6 @@ public class HStoreFile implements StoreFile {
 
   private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
 
-  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
-
-  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
-
   // Keys for fileinfo values in HFile
 
   /** Max Sequence ID in FileInfo */
@@ -122,20 +116,13 @@ public class HStoreFile implements StoreFile {
   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
 
   private final StoreFileInfo fileInfo;
-  private final FileSystem fs;
+
+  // StoreFile.Reader
+  private volatile StoreFileReader initialReader;
 
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
-  // Counter that is incremented every time a scanner is created on the
-  // store file. It is decremented when the scan on the store file is
-  // done.
-  private final AtomicInteger refCount = new AtomicInteger(0);
-
-  private final boolean noReadahead;
-
-  private final boolean primaryReplica;
-
   // Indicates if the file got compacted
   private volatile boolean compactedAway = false;
 
@@ -155,7 +142,7 @@ public class HStoreFile implements StoreFile {
   private CellComparator comparator;
 
   public CacheConfig getCacheConf() {
-    return cacheConf;
+    return this.cacheConf;
   }
 
   @Override
@@ -195,9 +182,6 @@ public class HStoreFile implements StoreFile {
    */
   private Map<byte[], byte[]> metadataMap;
 
-  // StoreFile.Reader
-  private volatile StoreFileReader reader;
-
   /**
    * Bloom filter type specified in column family configuration. Does not
    * necessarily correspond to the Bloom filter type present in the HFile.
@@ -220,37 +204,29 @@ public class HStoreFile implements StoreFile {
    */
   public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
       BloomType cfBloomType, boolean primaryReplica) throws IOException {
-    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
+    this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
   }
 
   /**
    * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
    * depending on the underlying files (10-20MB?).
-   * @param fs fs The current file system to use.
    * @param fileInfo The store file information.
-   * @param conf The current configuration.
-   * @param cacheConf The cache configuration and block cache reference.
    * @param cfBloomType The bloom type to use for this store file as specified by column
    *          family configuration. This may or may not be the same as the Bloom filter type
    *          actually present in the HFile, because column family configuration might change. If
    *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
-   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
+   * @param cacheConf The cache configuration and block cache reference.
    */
-  public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
-      BloomType cfBloomType, boolean primaryReplica) {
-    this.fs = fs;
+  public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
     this.fileInfo = fileInfo;
     this.cacheConf = cacheConf;
-    this.noReadahead =
-        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
-    if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
+    if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
       this.cfBloomType = cfBloomType;
     } else {
       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
           cfBloomType + " (disabled in config)");
       this.cfBloomType = BloomType.NONE;
     }
-    this.primaryReplica = primaryReplica;
   }
 
   /**
@@ -277,6 +253,7 @@ public class HStoreFile implements StoreFile {
 
   @Override
   public Path getQualifiedPath() {
+    FileSystem fs = fileInfo.getFileSystem();
     return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
   }
 
@@ -344,14 +321,14 @@ public class HStoreFile implements StoreFile {
 
   @VisibleForTesting
   public int getRefCount() {
-    return refCount.get();
+    return fileInfo.refCount.get();
   }
 
   /**
    * @return true if the file is still used in reads
    */
   public boolean isReferencedInReads() {
-    int rc = refCount.get();
+    int rc = fileInfo.refCount.get();
     assert rc >= 0; // we should not go negative.
     return rc > 0;
   }
@@ -376,16 +353,18 @@ public class HStoreFile implements StoreFile {
    * @see #closeStoreFile(boolean)
    */
   private void open() throws IOException {
-    if (this.reader != null) {
-      throw new IllegalAccessError("Already open");
+    fileInfo.initHDFSBlocksDistribution();
+    long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
+    ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
+    fileInfo.initHFileInfo(context);
+    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
+    if (reader == null) {
+      reader = fileInfo.createReader(context, cacheConf);
+      fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
     }
-
-    // Open the StoreFile.Reader
-    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
-      primaryReplica, refCount, true);
-
+    this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
     // Load up indices and fileinfo. This also loads Bloom filter type.
-    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
+    metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
 
     // Read in our metadata.
     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
@@ -425,10 +404,10 @@ public class HStoreFile implements StoreFile {
         // increase the seqId when it is a bulk loaded file from mob compaction.
         this.sequenceid += 1;
       }
-      this.reader.setSkipResetSeqId(skipResetSeqId);
-      this.reader.setBulkLoaded(true);
+      initialReader.setSkipResetSeqId(skipResetSeqId);
+      initialReader.setBulkLoaded(true);
     }
-    this.reader.setSequenceID(this.sequenceid);
+    initialReader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
     if (b != null) {
@@ -452,30 +431,31 @@ public class HStoreFile implements StoreFile {
     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
 
-    BloomType hfileBloomType = reader.getBloomFilterType();
+    BloomType hfileBloomType = initialReader.getBloomFilterType();
     if (cfBloomType != BloomType.NONE) {
-      reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
+      initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
       if (hfileBloomType != cfBloomType) {
         LOG.info("HFile Bloom filter type for "
-            + reader.getHFileReader().getName() + ": " + hfileBloomType
+            + initialReader.getHFileReader().getName() + ": " + hfileBloomType
             + ", but " + cfBloomType + " specified in column family "
             + "configuration");
       }
     } else if (hfileBloomType != BloomType.NONE) {
       LOG.info("Bloom filter turned off by CF config for "
-          + reader.getHFileReader().getName());
+          + initialReader.getHFileReader().getName());
     }
 
     // load delete family bloom filter
-    reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
+    initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
 
     try {
       byte[] data = metadataMap.get(TIMERANGE_KEY);
-      this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange();
+      initialReader.timeRange = data == null ? null :
+          TimeRangeTracker.parseFrom(data).toTimeRange();
     } catch (IllegalArgumentException e) {
       LOG.error("Error reading timestamp range data from meta -- " +
           "proceeding without", e);
-      this.reader.timeRange = null;
+      this.initialReader.timeRange = null;
     }
 
     try {
@@ -486,36 +466,45 @@ public class HStoreFile implements StoreFile {
     }
 
     // initialize so we can reuse them after reader closed.
-    firstKey = reader.getFirstKey();
-    lastKey = reader.getLastKey();
-    comparator = reader.getComparator();
+    firstKey = initialReader.getFirstKey();
+    lastKey = initialReader.getLastKey();
+    comparator = initialReader.getComparator();
   }
 
   /**
    * Initialize the reader used for pread.
    */
   public void initReader() throws IOException {
-    if (reader == null) {
-      try {
-        open();
-      } catch (Exception e) {
-        try {
-          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
-          this.closeStoreFile(evictOnClose);
-        } catch (IOException ee) {
-          LOG.warn("failed to close reader", ee);
+    if (initialReader == null) {
+      synchronized (this) {
+        if (initialReader == null) {
+          try {
+            open();
+          } catch (Exception e) {
+            try {
+              boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
+              this.closeStoreFile(evictOnClose);
+            } catch (IOException ee) {
+              LOG.warn("failed to close reader", ee);
+            }
+            throw e;
+          }
         }
-        throw e;
       }
     }
   }
 
   private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
     initReader();
-    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
-      primaryReplica, refCount, false);
-    reader.copyFields(this.reader);
-    return reader;
+    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
+    ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
+    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
+    if (reader == null) {
+      reader = fileInfo.createReader(context, cacheConf);
+      // steam reader need copy stuffs from pread reader
+      reader.copyFields(initialReader);
+    }
+    return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
   }
 
   /**
@@ -547,7 +536,7 @@ public class HStoreFile implements StoreFile {
    * @see #initReader()
    */
   public StoreFileReader getReader() {
-    return this.reader;
+    return this.initialReader;
   }
 
   /**
@@ -555,9 +544,9 @@ public class HStoreFile implements StoreFile {
    * @throws IOException
    */
   public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
-    if (this.reader != null) {
-      this.reader.close(evictOnClose);
-      this.reader = null;
+    if (this.initialReader != null) {
+      this.initialReader.close(evictOnClose);
+      this.initialReader = null;
     }
   }
 
@@ -568,7 +557,7 @@ public class HStoreFile implements StoreFile {
   public void deleteStoreFile() throws IOException {
     boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
     closeStoreFile(evictOnClose);
-    this.fs.delete(getPath(), true);
+    this.fileInfo.getFileSystem().delete(getPath(), true);
   }
 
   public void markCompactedAway() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index a76e0fe..abfb44f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -30,15 +30,19 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Describe a StoreFile (hfile, reference, link)
@@ -79,8 +83,11 @@ public class StoreFileInfo {
     Pattern.compile(String.format("^(%s|%s)\\.(.+)$",
       HFILE_NAME_REGEX, HFileLink.LINK_NAME_REGEX));
 
+  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
+  public static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
+
   // Configuration
-  private Configuration conf;
+  private final Configuration conf;
 
   // FileSystem handle
   private final FileSystem fs;
@@ -88,6 +95,8 @@ public class StoreFileInfo {
   // HDFS blocks distribution information
   private HDFSBlocksDistribution hdfsBlocksDistribution = null;
 
+  private HFileInfo hfileInfo;
+
   // If this storefile references another, this is the reference instance.
   private final Reference reference;
 
@@ -103,19 +112,29 @@ public class StoreFileInfo {
 
   private long size;
 
+  private final boolean primaryReplica;
+
+  private final boolean noReadahead;
+
+  // Counter that is incremented every time a scanner is created on the
+  // store file. It is decremented when the scan on the store file is
+  // done.
+  final AtomicInteger refCount = new AtomicInteger(0);
+
   /**
    * Create a Store File Info
    * @param conf the {@link Configuration} to use
    * @param fs The current file system to use.
    * @param initialPath The {@link Path} of the file
+   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
    */
-  public StoreFileInfo(final Configuration conf, final FileSystem fs, final Path initialPath)
-      throws IOException {
-    this(conf, fs, null, initialPath);
+  public StoreFileInfo(final Configuration conf, final FileSystem fs, final Path initialPath,
+      final boolean primaryReplica) throws IOException {
+    this(conf, fs, null, initialPath, primaryReplica);
   }
 
-  private StoreFileInfo(final Configuration conf, final FileSystem fs,
-      final FileStatus fileStatus, final Path initialPath) throws IOException {
+  private StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
+      final Path initialPath, final boolean primaryReplica) throws IOException {
     assert fs != null;
     assert initialPath != null;
     assert conf != null;
@@ -123,12 +142,15 @@ public class StoreFileInfo {
     this.fs = fs;
     this.conf = conf;
     this.initialPath = initialPath;
+    this.primaryReplica = primaryReplica;
+    this.noReadahead = this.conf.getBoolean(STORE_FILE_READER_NO_READAHEAD,
+        DEFAULT_STORE_FILE_READER_NO_READAHEAD);
     Path p = initialPath;
     if (HFileLink.isHFileLink(p)) {
       // HFileLink
       this.reference = null;
       this.link = HFileLink.buildFromHFileLinkPattern(conf, p);
-      if (LOG.isTraceEnabled()) LOG.trace(p + " is a link");
+      LOG.trace("{} is a link", p);
     } else if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       Path referencePath = getReferredToFile(p);
@@ -139,8 +161,7 @@ public class StoreFileInfo {
         // Reference
         this.link = null;
       }
-      if (LOG.isTraceEnabled()) LOG.trace(p + " is a " + reference.getFileRegion() +
-              " reference to " + referencePath);
+      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
     } else if (isHFile(p)) {
       // HFile
       if (fileStatus != null) {
@@ -166,7 +187,7 @@ public class StoreFileInfo {
    */
   public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus)
       throws IOException {
-    this(conf, fs, fileStatus, fileStatus.getPath());
+    this(conf, fs, fileStatus, fileStatus.getPath(), true);
   }
 
   /**
@@ -177,13 +198,7 @@ public class StoreFileInfo {
    */
   public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
       final HFileLink link) {
-    this.fs = fs;
-    this.conf = conf;
-    // initialPath can be null only if we get a link.
-    this.initialPath = (fileStatus == null) ? null : fileStatus.getPath();
-      // HFileLink
-    this.reference = null;
-    this.link = link;
+    this(conf, fs, fileStatus, null, link);
   }
 
   /**
@@ -195,12 +210,7 @@ public class StoreFileInfo {
    */
   public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
       final Reference reference) {
-    this.fs = fs;
-    this.conf = conf;
-    this.initialPath = fileStatus.getPath();
-    this.createdTimestamp = fileStatus.getModificationTime();
-    this.reference = reference;
-    this.link = null;
+    this(conf, fs, fileStatus, reference, null);
   }
 
   /**
@@ -215,10 +225,13 @@ public class StoreFileInfo {
       final Reference reference, final HFileLink link) {
     this.fs = fs;
     this.conf = conf;
-    this.initialPath = fileStatus.getPath();
-    this.createdTimestamp = fileStatus.getModificationTime();
+    this.primaryReplica = false;
+    this.initialPath = (fileStatus == null) ? null : fileStatus.getPath();
+    this.createdTimestamp = (fileStatus == null) ? 0 :fileStatus.getModificationTime();
     this.reference = reference;
     this.link = link;
+    this.noReadahead = this.conf.getBoolean(STORE_FILE_READER_NO_READAHEAD,
+        DEFAULT_STORE_FILE_READER_NO_READAHEAD);
   }
 
   /**
@@ -265,19 +278,21 @@ public class StoreFileInfo {
     return this.hdfsBlocksDistribution;
   }
 
-  /**
-   * Open a Reader for the StoreFile
-   * @param fs The current file system to use.
-   * @param cacheConf The cache configuration and block cache reference.
-   * @return The StoreFile.Reader for the file
-   */
-  public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind,
-      long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared)
+  StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf)
+      throws IOException {
+    StoreFileReader reader = null;
+    if (this.reference != null) {
+      reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, refCount, conf);
+    } else {
+      reader = new StoreFileReader(context, hfileInfo, cacheConf, refCount, conf);
+    }
+    return reader;
+  }
+
+  ReaderContext createReaderContext(boolean doDropBehind, long readahead, ReaderType type)
       throws IOException {
     FSDataInputStreamWrapper in;
     FileStatus status;
-
-    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
     if (this.link != null) {
       // HFileLink
       in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead);
@@ -301,27 +316,18 @@ public class StoreFileInfo {
       status = fs.getFileStatus(initialPath);
     }
     long length = status.getLen();
-    hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
-
-    StoreFileReader reader = null;
-    if (this.coprocessorHost != null) {
-      reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
-        cacheConf, reference);
-    }
-    if (reader == null) {
-      if (this.reference != null) {
-        reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
-            isPrimaryReplicaStoreFile, refCount, shared, conf);
-      } else {
-        reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf,
-            isPrimaryReplicaStoreFile, refCount, shared, conf);
-      }
-    }
-    if (this.coprocessorHost != null) {
-      reader = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(), in, length,
-        cacheConf, reference, reader);
+    ReaderContextBuilder contextBuilder = new ReaderContextBuilder()
+        .withInputStreamWrapper(in)
+        .withFileSize(length)
+        .withPrimaryReplicaReader(this.primaryReplica)
+        .withReaderType(type)
+        .withFileSystem(fs);
+    if (this.reference != null) {
+      contextBuilder.withFilePath(this.getPath());
+    } else {
+      contextBuilder.withFilePath(status.getPath());
     }
-    return reader;
+    return contextBuilder.build();
   }
 
   /**
@@ -487,7 +493,7 @@ public class StoreFileInfo {
   public static Path getReferredToFile(final Path p) {
     Matcher m = REF_NAME_PATTERN.matcher(p.getName());
     if (m == null || !m.matches()) {
-      LOG.warn("Failed match of store file name " + p.toString());
+      LOG.warn("Failed match of store file name {}", p.toString());
       throw new IllegalArgumentException("Failed match of store file name " +
           p.toString());
     }
@@ -497,10 +503,7 @@ public class StoreFileInfo {
     // Tabledir is up two directories from where Reference was written.
     Path tableDir = p.getParent().getParent().getParent();
     String nameStrippedOfSuffix = m.group(1);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("reference '" + p + "' to region=" + otherRegion
-        + " hfile=" + nameStrippedOfSuffix);
-    }
+    LOG.trace("reference {} to region={} hfile={}", p, otherRegion, nameStrippedOfSuffix);
 
     // Build up new path with the referenced region in place of our current
     // region in the reference path.  Also strip regionname suffix from name.
@@ -535,7 +538,7 @@ public class StoreFileInfo {
     // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
     // NOTE: that the HFileLink is just a name, so it's an empty file.
     if (!HFileLink.isHFileLink(p) && fileStatus.getLen() <= 0) {
-      LOG.warn("Skipping " + p + " because it is empty. HBASE-646 DATA LOSS?");
+      LOG.warn("Skipping {} because it is empty. HBASE-646 DATA LOSS?", p);
       return false;
     }
 
@@ -624,4 +627,50 @@ public class StoreFileInfo {
       return HFileLink.getReferencedHFileName(initialPath.getName());
     }
   }
+
+  FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  Configuration getConf() {
+    return this.conf;
+  }
+
+  boolean isNoReadahead() {
+    return this.noReadahead;
+  }
+
+  HFileInfo getHFileInfo() {
+    return hfileInfo;
+  }
+
+  void initHDFSBlocksDistribution() throws IOException {
+    hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
+  }
+
+  StoreFileReader preStoreFileReaderOpen(ReaderContext context, CacheConfig cacheConf)
+      throws IOException {
+    StoreFileReader reader = null;
+    if (this.coprocessorHost != null) {
+      reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(),
+          context.getInputStreamWrapper(), context.getFileSize(),
+          cacheConf, reference);
+    }
+    return reader;
+  }
+
+  StoreFileReader postStoreFileReaderOpen(ReaderContext context, CacheConfig cacheConf,
+      StoreFileReader reader) throws IOException {
+    StoreFileReader res = reader;
+    if (this.coprocessorHost != null) {
+      res = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(),
+          context.getInputStreamWrapper(), context.getFileSize(),
+          cacheConf, reference, reader);
+    }
+    return res;
+  }
+
+  public void initHFileInfo(ReaderContext context) throws IOException {
+    this.hfileInfo = new HFileInfo(context, conf);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index f5e7e76..f92a4d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -30,8 +30,6 @@ import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -39,13 +37,15 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -81,43 +81,31 @@ public class StoreFileReader {
   // store file. It is decremented when the scan on the store file is
   // done. All StoreFileReader for the same StoreFile will share this counter.
   private final AtomicInteger refCount;
+  private final ReaderContext context;
 
-  // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
-  // close the internal reader when readCompleted is called.
-  @VisibleForTesting
-  final boolean shared;
-
-  private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
+  private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context) {
     this.reader = reader;
     bloomFilterType = BloomType.NONE;
     this.refCount = refCount;
-    this.shared = shared;
-  }
-
-  public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf,
-      boolean primaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf)
-      throws IOException {
-    this(HFile.createReader(fs, path, cacheConf, primaryReplicaStoreFile, conf), refCount, shared);
+    this.context = context;
   }
 
-  public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
-      CacheConfig cacheConf, boolean primaryReplicaStoreFile, AtomicInteger refCount,
-      boolean shared, Configuration conf) throws IOException {
-    this(HFile.createReader(fs, path, in, size, cacheConf, primaryReplicaStoreFile, conf), refCount,
-        shared);
+  public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
+      AtomicInteger refCount, Configuration conf) throws IOException {
+    this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context);
   }
 
-  void copyFields(StoreFileReader reader) {
-    this.generalBloomFilter = reader.generalBloomFilter;
-    this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter;
-    this.bloomFilterType = reader.bloomFilterType;
-    this.sequenceID = reader.sequenceID;
-    this.timeRange = reader.timeRange;
-    this.lastBloomKey = reader.lastBloomKey;
-    this.bulkLoadResult = reader.bulkLoadResult;
-    this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV;
-    this.skipResetSeqId = reader.skipResetSeqId;
-    this.prefixLength = reader.prefixLength;
+  void copyFields(StoreFileReader storeFileReader) throws IOException {
+    this.generalBloomFilter = storeFileReader.generalBloomFilter;
+    this.deleteFamilyBloomFilter = storeFileReader.deleteFamilyBloomFilter;
+    this.bloomFilterType = storeFileReader.bloomFilterType;
+    this.sequenceID = storeFileReader.sequenceID;
+    this.timeRange = storeFileReader.timeRange;
+    this.lastBloomKey = storeFileReader.lastBloomKey;
+    this.bulkLoadResult = storeFileReader.bulkLoadResult;
+    this.lastBloomKeyOnlyKV = storeFileReader.lastBloomKeyOnlyKV;
+    this.skipResetSeqId = storeFileReader.skipResetSeqId;
+    this.prefixLength = storeFileReader.prefixLength;
   }
 
   public boolean isPrimaryReplicaReader() {
@@ -131,7 +119,7 @@ public class StoreFileReader {
   StoreFileReader() {
     this.refCount = new AtomicInteger(0);
     this.reader = null;
-    this.shared = false;
+    this.context = null;
   }
 
   public CellComparator getComparator() {
@@ -177,7 +165,7 @@ public class StoreFileReader {
    */
   void readCompleted() {
     refCount.decrementAndGet();
-    if (!shared) {
+    if (context.getReaderType() == ReaderType.STREAM) {
       try {
         reader.close(false);
       } catch (IOException e) {
@@ -492,7 +480,7 @@ public class StoreFileReader {
   }
 
   public Map<byte[], byte[]> loadFileInfo() throws IOException {
-    Map<byte [], byte []> fi = reader.loadFileInfo();
+    Map<byte [], byte []> fi = reader.getHFileInfo();
 
     byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
     if (b != null) {
@@ -700,4 +688,8 @@ public class StoreFileReader {
   public int getPrefixLength() {
     return prefixLength;
   }
+
+  public ReaderContext getReaderContext() {
+    return this.context;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 503325a..63bf130 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -178,7 +178,7 @@ public abstract class Compactor<T extends CellSink> {
           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
         }
       }
-      tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
+      tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
       if (tmp != null) {
         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 16ed114..a4bb00d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -80,7 +80,10 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -727,7 +730,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     Optional<byte[]> first, last;
     try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
       CacheConfig.DISABLED, true, getConf())) {
-      hfr.loadFileInfo();
       first = hfr.getFirstRowKey();
       last = hfr.getLastRowKey();
     } catch (FileNotFoundException fnfe) {
@@ -868,7 +870,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
             LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
                 " for family " + builder.getNameAsString());
           }
-          reader.loadFileInfo();
           byte[] first = reader.getFirstRowKey().get();
           byte[] last = reader.getLastRowKey().get();
 
@@ -1102,8 +1103,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     HalfStoreFileReader halfReader = null;
     StoreFileWriter halfWriter = null;
     try {
-      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
-          new AtomicInteger(0), true, conf);
+      ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
+      HFileInfo hfile = new HFileInfo(context, conf);
+      halfReader =
+        new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf);
+      hfile.initMetaAndIndex(halfReader.getHFileReader());
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -1148,7 +1152,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       return false;
     }
 
-    return !HFile.isReservedFileInfoKey(key);
+    return !HFileInfo.isReservedFileInfoKey(key);
   }
 
   private boolean isCreateTable() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index fbaab3d..b72e1cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -135,7 +135,6 @@ public class CompressionTest {
     Cell cc = null;
     HFile.Reader reader = HFile.createReader(fs, path, CacheConfig.DISABLED, true, conf);
     try {
-      reader.loadFileInfo();
       HFileScanner scanner = reader.getScanner(false, true);
       scanner.seekTo(); // position to the start of file
       // Scanner does not do Cells yet. Do below for now till fixed.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 8a29a5a..910f523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -1017,7 +1017,6 @@ public class HBaseFsck extends Configured implements Closeable {
         HFile.Reader hf = null;
         try {
           hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED, true, getConf());
-          hf.loadFileInfo();
           Optional<Cell> startKv = hf.getFirstKey();
           start = CellUtil.cloneRow(startKv.get());
           Optional<Cell> endKv = hf.getLastKey();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 0609733..6a6a1c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -124,7 +124,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
 
     // if this is a primary region, just return the StoreFileInfo constructed from path
     if (RegionInfo.COMPARATOR.compare(regionInfo, regionInfoForFs) == 0) {
-      return new StoreFileInfo(conf, fs, path);
+      return new StoreFileInfo(conf, fs, path, true);
     }
 
     // else create a store file link. The link file does not exists on filesystem though.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
index 3ee6f7d..4c5be46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
@@ -404,7 +404,6 @@ public class HFilePerformanceEvaluation {
     @Override
     void setUp() throws Exception {
       reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
-      this.reader.loadFileInfo();
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
index 9c11070..cc781b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
@@ -39,7 +39,10 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -99,7 +102,6 @@ public class TestHalfStoreFileReader {
     w.close();
 
     HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
-    r.loadFileInfo();
     Cell midKV = r.midKey().get();
     byte[] midkey = CellUtil.cloneRow(midKV);
 
@@ -116,8 +118,11 @@ public class TestHalfStoreFileReader {
 
   private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
       throws IOException {
-    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConf, bottom, true,
-        new AtomicInteger(0), true, TEST_UTIL.getConfiguration());
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
+    HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
+    final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConf,
+        bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
+    fileInfo.initMetaAndIndex(halfreader.getHFileReader());
     halfreader.loadFileInfo();
     final HFileScanner scanner = halfreader.getScanner(false, false);
 
@@ -158,7 +163,6 @@ public class TestHalfStoreFileReader {
     w.close();
 
     HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
-    r.loadFileInfo();
     Cell midKV = r.midKey().get();
     byte[] midkey = CellUtil.cloneRow(midKV);
 
@@ -210,8 +214,11 @@ public class TestHalfStoreFileReader {
 
   private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
       CacheConfig cacheConfig) throws IOException {
-    final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConfig, bottom, true,
-        new AtomicInteger(0), true, TEST_UTIL.getConfiguration());
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
+    HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
+    final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConfig,
+        bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
+    fileInfo.initMetaAndIndex(halfreader.getHFileReader());
     halfreader.loadFileInfo();
     final HFileScanner scanner = halfreader.getScanner(false, false);
     scanner.seekBefore(seekBefore);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index 6d02854..85f74c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -98,7 +98,13 @@ public class TestChecksum {
 
     FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
     meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
-    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
+    ReaderContext context = new ReaderContextBuilder()
+        .withInputStreamWrapper(is)
+        .withFileSize(totalSize)
+        .withFileSystem((HFileSystem) fs)
+        .withFilePath(path)
+        .build();
+    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
         meta, ByteBuffAllocator.HEAP);
     HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
     assertTrue(!b.isSharedMem());
@@ -145,7 +151,13 @@ public class TestChecksum {
 
       FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
       meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
-      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
+      ReaderContext context = new ReaderContextBuilder()
+          .withInputStreamWrapper(is)
+          .withFileSize(totalSize)
+          .withFileSystem((HFileSystem) fs)
+          .withFilePath(path)
+          .build();
+      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
           meta, ByteBuffAllocator.HEAP);
       HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
       assertTrue(!b.isSharedMem());
@@ -216,7 +228,13 @@ public class TestChecksum {
               .withIncludesTags(useTags)
               .withHBaseCheckSum(true)
               .build();
-        HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
+        ReaderContext context = new ReaderContextBuilder()
+           .withInputStreamWrapper(is)
+           .withFileSize(totalSize)
+           .withFileSystem(fs)
+           .withFilePath(path)
+           .build();
+        HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta);
         HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         b.sanityCheck();
         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
@@ -261,7 +279,13 @@ public class TestChecksum {
         HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
         assertEquals(false, newfs.useHBaseChecksum());
         is = new FSDataInputStreamWrapper(newfs, path);
-        hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
+        context = new ReaderContextBuilder()
+            .withInputStreamWrapper(is)
+            .withFileSize(totalSize)
+            .withFileSystem(newfs)
+            .withFilePath(path)
+            .build();
+        hbr = new CorruptedFSReaderImpl(context, meta);
         b = hbr.readBlockData(0, -1, pread, false, true);
         is.close();
         b.sanityCheck();
@@ -342,9 +366,14 @@ public class TestChecksum {
                .withHBaseCheckSum(true)
                .withBytesPerCheckSum(bytesPerChecksum)
                .build();
+        ReaderContext context = new ReaderContextBuilder()
+            .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
+            .withFileSize(totalSize)
+            .withFileSystem(hfs)
+            .withFilePath(path)
+            .build();
         HFileBlock.FSReader hbr =
-            new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize,
-                hfs, path, meta, ByteBuffAllocator.HEAP);
+            new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP);
         HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         is.close();
@@ -384,9 +413,8 @@ public class TestChecksum {
      */
     boolean corruptDataStream = false;
 
-    public CorruptedFSReaderImpl(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
-        Path path, HFileContext meta) throws IOException {
-      super(istream, fileSize, (HFileSystem) fs, path, meta, ByteBuffAllocator.HEAP);
+    public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta) throws IOException {
+      super(context, meta, ByteBuffAllocator.HEAP);
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 43b403d..144d0b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -112,6 +113,20 @@ public class TestHFile  {
     fs = TEST_UTIL.getTestFileSystem();
   }
 
+  public static Reader createReaderFromStream(ReaderContext context, CacheConfig cacheConf,
+      Configuration conf) throws IOException {
+    HFileInfo fileInfo = new HFileInfo(context, conf);
+    Reader preadReader = HFile.createReader(context, fileInfo, cacheConf, conf);
+    fileInfo.initMetaAndIndex(preadReader);
+    preadReader.close();
+    context = new ReaderContextBuilder()
+        .withFileSystemAndPath(context.getFileSystem(), context.getFilePath())
+        .withReaderType(ReaderType.STREAM)
+        .build();
+    Reader streamReader = HFile.createReader(context, fileInfo, cacheConf,  conf);
+    return streamReader;
+  }
+
   private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount,
       int minAllocSize) {
     Configuration that = HBaseConfiguration.create(conf);
@@ -301,7 +316,6 @@ public class TestHFile  {
         HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).withFileContext(context).create();
     w.close();
     Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
-    r.loadFileInfo();
     assertFalse(r.getFirstKey().isPresent());
     assertFalse(r.getLastKey().isPresent());
   }
@@ -317,7 +331,7 @@ public class TestHFile  {
 
     try {
       Reader r = HFile.createReader(fs, f, cacheConf, true, conf);
-    } catch (CorruptHFileException che) {
+    } catch (CorruptHFileException | IllegalArgumentException che) {
       // Expected failure
       return;
     }
@@ -355,8 +369,8 @@ public class TestHFile  {
     truncateFile(fs, w.getPath(), trunc);
 
     try {
-      Reader r = HFile.createReader(fs, trunc, cacheConf, true, conf);
-    } catch (CorruptHFileException che) {
+      HFile.createReader(fs, trunc, cacheConf, true, conf);
+    } catch (CorruptHFileException | IllegalArgumentException che) {
       // Expected failure
       return;
     }
@@ -460,11 +474,10 @@ public class TestHFile  {
     writeRecords(writer, useTags);
     fout.close();
     FSDataInputStream fin = fs.open(ncHFile);
-    Reader reader = HFile.createReaderFromStream(ncHFile, fs.open(ncHFile),
-      fs.getFileStatus(ncHFile).getLen(), cacheConf, conf);
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, ncHFile).build();
+    Reader reader = createReaderFromStream(context, cacheConf, conf);
     System.out.println(cacheConf.toString());
     // Load up the index.
-    reader.loadFileInfo();
     // Get a scanner that caches and that does not use pread.
     HFileScanner scanner = reader.getScanner(true, false);
     // Align scanner at start of the file.
@@ -552,16 +565,13 @@ public class TestHFile  {
     someTestingWithMetaBlock(writer);
     writer.close();
     fout.close();
-    FSDataInputStream fin = fs.open(mFile);
-    Reader reader = HFile.createReaderFromStream(mFile, fs.open(mFile),
-        this.fs.getFileStatus(mFile).getLen(), cacheConf, conf);
-    reader.loadFileInfo();
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build();
+    Reader reader = createReaderFromStream(context, cacheConf, conf);
     // No data -- this should return false.
     assertFalse(reader.getScanner(false, false).seekTo());
     someReadingWithMetaBlock(reader);
     fs.delete(mFile, true);
     reader.close();
-    fin.close();
   }
 
   // test meta blocks for hfiles
@@ -588,7 +598,6 @@ public class TestHFile  {
       writer.close();
       fout.close();
       Reader reader = HFile.createReader(fs, mFile, cacheConf, true, conf);
-      reader.loadFileInfo();
       assertNull(reader.getMetaBlock("non-existant", false));
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 006415c..b5ec798 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -379,11 +380,17 @@ public class TestHFileBlock {
 
         FSDataInputStream is = fs.open(path);
         meta = new HFileContextBuilder()
-        .withHBaseCheckSum(true)
-        .withIncludesMvcc(includesMemstoreTS)
-        .withIncludesTags(includesTag)
-        .withCompression(algo).build();
-        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+            .withHBaseCheckSum(true)
+            .withIncludesMvcc(includesMemstoreTS)
+            .withIncludesTags(includesTag)
+            .withCompression(algo).build();
+        ReaderContext context = new ReaderContextBuilder()
+            .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+            .withFileSize(totalSize)
+            .withFilePath(path)
+            .withFileSystem(fs)
+            .build();
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
         HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         is.close();
         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
@@ -396,7 +403,13 @@ public class TestHFileBlock {
 
         if (algo == GZ) {
           is = fs.open(path);
-          hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+          ReaderContext readerContext = new ReaderContextBuilder()
+              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+              .withFileSize(totalSize)
+              .withFilePath(path)
+              .withFileSystem(fs)
+              .build();
+          hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc);
           b = hbr.readBlockData(0,
             2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
           assertEquals(expected, b);
@@ -479,8 +492,14 @@ public class TestHFileBlock {
                 .withIncludesMvcc(includesMemstoreTS)
                 .withIncludesTags(includesTag)
                 .build();
+          ReaderContext context = new ReaderContextBuilder()
+              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+              .withFileSize(totalSize)
+              .withFilePath(path)
+              .withFileSystem(fs)
+              .build();
           HFileBlock.FSReaderImpl hbr =
-              new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+              new HFileBlock.FSReaderImpl(context, meta, alloc);
           hbr.setDataBlockEncoder(dataBlockEncoder);
           hbr.setIncludesMemStoreTS(includesMemstoreTS);
           HFileBlock blockFromHFile, blockUnpacked;
@@ -609,8 +628,14 @@ public class TestHFileBlock {
                               .withIncludesMvcc(includesMemstoreTS)
                               .withIncludesTags(includesTag)
                               .withCompression(algo).build();
+          ReaderContext context = new ReaderContextBuilder()
+              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+              .withFileSize(totalSize)
+              .withFilePath(path)
+              .withFileSystem(fs)
+              .build();
           HFileBlock.FSReader hbr =
-              new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+              new HFileBlock.FSReaderImpl(context, meta, alloc);
           long curOffset = 0;
           for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
             if (!pread) {
@@ -807,8 +832,13 @@ public class TestHFileBlock {
                           .withIncludesTags(includesTag)
                           .withCompression(compressAlgo)
                           .build();
-      HFileBlock.FSReader hbr =
-          new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc);
+      ReaderContext context = new ReaderContextBuilder()
+          .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+          .withFileSize(fileSize)
+          .withFilePath(path)
+          .withFileSystem(fs)
+          .build();
+      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
 
       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
       ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 00223fd..14b9bbf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -208,13 +208,14 @@ public class TestHFileBlockIndex {
                         .withIncludesTags(useTags)
                         .withCompression(compr)
                         .build();
-    HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(istream,
-        fs.getFileStatus(path).getLen(), meta, ByteBuffAllocator.HEAP);
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
+    HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(context, meta,
+        ByteBuffAllocator.HEAP);
 
     BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
     HFileBlockIndex.BlockIndexReader indexReader =
         new HFileBlockIndex.CellBasedKeyBlockIndexReader(
-            CellComparatorImpl.COMPARATOR, numLevels, brw);
+            CellComparatorImpl.COMPARATOR, numLevels);
 
     indexReader.readRootIndex(blockReader.blockRange(rootIndexOffset,
         fileSize).nextBlockWithBlockType(BlockType.ROOT_INDEX), numRootEntries);
@@ -230,7 +231,7 @@ public class TestHFileBlockIndex {
       KeyValue.KeyOnlyKeyValue keyOnlyKey = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
       HFileBlock b =
           indexReader.seekToDataBlock(keyOnlyKey, null, true,
-            true, false, null);
+            true, false, null, brw);
       if (PrivateCellUtil.compare(CellComparatorImpl.COMPARATOR, keyOnlyKey, firstKeyInFile, 0,
         firstKeyInFile.length) < 0) {
         assertTrue(b == null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 17aa326..3fb0c7d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
@@ -152,8 +153,13 @@ public class TestHFileEncryption {
         os.close();
       }
       FSDataInputStream is = fs.open(path);
+      ReaderContext context = new ReaderContextBuilder()
+          .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+          .withFilePath(path)
+          .withFileSystem(fs)
+          .withFileSize(totalSize).build();
       try {
-        HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, fileContext,
+        HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, fileContext,
             ByteBuffAllocator.HEAP);
         long pos = 0;
         for (int i = 0; i < blocks; i++) {
@@ -191,7 +197,6 @@ public class TestHFileEncryption {
     // read it back in and validate correct crypto metadata
     HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
     try {
-      reader.loadFileInfo();
       FixedFileTrailer trailer = reader.getTrailer();
       assertNotNull(trailer.getEncryptionKey());
       Encryption.Context readerContext = reader.getFileContext().getEncryptionContext();
@@ -244,7 +249,6 @@ public class TestHFileEncryption {
         HFileScanner scanner = null;
         HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
         try {
-          reader.loadFileInfo();
           FixedFileTrailer trailer = reader.getTrailer();
           assertNotNull(trailer.getEncryptionKey());
           scanner = reader.getScanner(false, false);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java
index 513122b..41568b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java
@@ -92,7 +92,6 @@ public class TestHFileReaderImpl {
         new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
 
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf, bucketcache), true, conf);
-    reader.loadFileInfo();
 
     // warm cache
     HFileScanner scanner = reader.getScanner(true, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
index fa67039..47cbd85 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
@@ -203,15 +203,15 @@ public class TestHFileScannerImplReferenceCount {
 
     HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
     HFileBlock block1 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     waitBucketCacheFlushed(defaultBC);
     Assert.assertTrue(block1.getBlockType().isData());
     Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
 
     HFileBlock block2 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     waitBucketCacheFlushed(defaultBC);
     Assert.assertTrue(block2.getBlockType().isData());
     Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
@@ -287,13 +287,13 @@ public class TestHFileScannerImplReferenceCount {
 
     HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
     HFileBlock block1 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     Assert.assertTrue(block1.getBlockType().isData());
     Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
     HFileBlock block2 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     Assert.assertTrue(block2.getBlockType().isData());
     Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
     // Wait until flushed to IOEngine;
@@ -348,8 +348,8 @@ public class TestHFileScannerImplReferenceCount {
 
     // Reload the block1 again.
     block1 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     // Wait until flushed to IOEngine;
     waitBucketCacheFlushed(defaultBC);
     Assert.assertTrue(block1.getBlockType().isData());
@@ -417,13 +417,13 @@ public class TestHFileScannerImplReferenceCount {
 
     HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
     HFileBlock block1 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     Assert.assertTrue(block1.getBlockType().isData());
     Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
     HFileBlock block2 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
     Assert.assertTrue(block2.getBlockType().isData());
     Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
     // One RPC reference path.
@@ -467,8 +467,8 @@ public class TestHFileScannerImplReferenceCount {
     Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
 
     HFileBlock block1 = reader.getDataBlockIndexReader()
-        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-        .getHFileBlock();
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false,
+            DataBlockEncoding.NONE, reader).getHFileBlock();
 
     Assert.assertTrue(block1.isSharedMem());
     Assert.assertTrue(block1 instanceof SharedMemHFileBlock);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
index 4dc9c68..095af0b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
@@ -23,7 +23,6 @@ import java.util.Random;
 import java.util.StringTokenizer;
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -187,10 +186,8 @@ public class TestHFileSeek extends TestCase {
   public void seekTFile() throws IOException {
     int miss = 0;
     long totalBytes = 0;
-    FSDataInputStream fsdis = fs.open(path);
-    Reader reader = HFile.createReaderFromStream(path, fsdis,
-        fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
-    reader.loadFileInfo();
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
+    Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
     KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
         ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
     HFileScanner scanner = reader.getScanner(false, USE_PREAD);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index f8da706..4f4d36b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,9 +44,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -127,7 +128,8 @@ public class TestHFileWriterV3 {
                            .withBlockSize(4096)
                            .withIncludesTags(useTags)
                            .withCompression(compressAlgo).build();
-    HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf))
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig)
             .withPath(fs, hfilePath)
             .withFileContext(context)
             .withComparator(CellComparatorImpl.COMPARATOR)
@@ -181,8 +183,13 @@ public class TestHFileWriterV3 {
                         .withIncludesMvcc(false)
                         .withIncludesTags(useTags)
                         .withHBaseCheckSum(true).build();
+    ReaderContext readerContext = new ReaderContextBuilder()
+        .withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
+        .withFilePath(hfilePath)
+        .withFileSystem(fs)
+        .withFileSize(fileSize).build();
     HFileBlock.FSReader blockReader =
-        new HFileBlock.FSReaderImpl(fsdis, fileSize, meta, ByteBuffAllocator.HEAP);
+        new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP);
     // Comparator class name is stored in the trailer in version 3.
     CellComparator comparator = trailer.createComparator();
     HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
@@ -199,8 +206,18 @@ public class TestHFileWriterV3 {
     dataBlockIndexReader.readMultiLevelIndexRoot(
         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
 
+    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
+    readerContext = new ReaderContextBuilder()
+        .withFilePath(hfilePath)
+        .withFileSize(fileSize)
+        .withFileSystem(wrapper.getHfs())
+        .withInputStreamWrapper(wrapper)
+        .build();
+    HFileInfo hfile = new HFileInfo(readerContext, conf);
+    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
+    hfile.initMetaAndIndex(reader);
     if (findMidKey) {
-      Cell midkey = dataBlockIndexReader.midkey();
+      Cell midkey = dataBlockIndexReader.midkey(reader);
       assertNotNull("Midkey should not be null", midkey);
     }
 
@@ -209,7 +226,7 @@ public class TestHFileWriterV3 {
         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
           .getByteStream(), trailer.getMetaIndexCount());
     // File info
-    FileInfo fileInfo = new FileInfo();
+    HFileInfo fileInfo = new HFileInfo();
     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
     byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
     boolean includeMemstoreTS = keyValueFormatVersion != null &&
@@ -304,6 +321,7 @@ public class TestHFileWriterV3 {
     }
 
     fsdis.close();
+    reader.close();
   }
 }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
index f1a12a2..64d44c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -124,9 +124,15 @@ public class TestLazyDataBlockDecompression {
     long fileSize = fs.getFileStatus(path).getLen();
     FixedFileTrailer trailer =
       FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
-    HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig,
-      fsdis.getHfs(), conf);
-    reader.loadFileInfo();
+    ReaderContext context = new ReaderContextBuilder()
+        .withFilePath(path)
+        .withFileSize(fileSize)
+        .withFileSystem(fsdis.getHfs())
+        .withInputStreamWrapper(fsdis)
+        .build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
+    HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf);
+    fileInfo.initMetaAndIndex(reader);
     long offset = trailer.getFirstDataBlockOffset(),
       max = trailer.getLastDataBlockOffset();
     List<HFileBlock> blocks = new ArrayList<>(4);
@@ -138,6 +144,7 @@ public class TestLazyDataBlockDecompression {
       blocks.add(block);
     }
     LOG.info("read " + Iterables.toString(blocks));
+    reader.close();
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
index dc25518..1ae861c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
@@ -115,7 +115,6 @@ public class TestReseekTo {
 
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), ncTFile, cacheConf,
       true, TEST_UTIL.getConfiguration());
-    reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
 
     scanner.seekTo();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
index b6e4a3e..9ab1f24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
@@ -150,7 +150,6 @@ public class TestSeekTo {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
     assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
 
@@ -209,7 +208,6 @@ public class TestSeekTo {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     HFileScanner scanner = reader.getScanner(false, true);
     assertFalse(scanner.seekBefore(toKV("a", tagUsage)));
     assertFalse(scanner.seekBefore(toKV("b", tagUsage)));
@@ -303,7 +301,6 @@ public class TestSeekTo {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     assertEquals(2, reader.getDataBlockIndexReader().getRootBlockCount());
     HFileScanner scanner = reader.getScanner(false, true);
     // lies before the start of the file.
@@ -336,7 +333,6 @@ public class TestSeekTo {
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
     HFileBlockIndex.BlockIndexReader blockIndexReader =
       reader.getDataBlockIndexReader();
     System.out.println(blockIndexReader.toString());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index 230e749..3fc8e95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -248,7 +248,6 @@ public class TestEncryptionKeyRotation {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
       new CacheConfig(conf), true, conf);
     try {
-      reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();
       assertNotNull("Reader has a null crypto context", cryptoContext);
       Key key = cryptoContext.getKey();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
index eef0b90..9496d8e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionRandomKeying.java
@@ -74,7 +74,6 @@ public class TestEncryptionRandomKeying {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
       new CacheConfig(conf), true, conf);
     try {
-      reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();
       assertNotNull("Reader has a null crypto context", cryptoContext);
       Key key = cryptoContext.getKey();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index dbf1bd9..e0c2424 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -332,7 +332,7 @@ public class TestHStore {
 
     // Verify that compression and encoding settings are respected
     HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
-    assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
+    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
     assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
     reader.close();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index 77de366..3b3306b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -61,7 +62,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -281,9 +285,8 @@ public class TestHStoreFile extends HBaseTestCase {
                   HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
 
     // Try to open store file from link
-    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath);
-    HStoreFile hsf =
-        new HStoreFile(this.fs, storeFileInfo, testConf, cacheConf, BloomType.NONE, true);
+    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
+    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
     assertTrue(storeFileInfo.isLink());
     hsf.initReader();
 
@@ -550,8 +553,11 @@ public class TestHStoreFile extends HBaseTestCase {
     }
     writer.close();
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     reader.loadFileInfo();
     reader.loadBloomfilter();
     StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
@@ -638,8 +644,11 @@ public class TestHStoreFile extends HBaseTestCase {
     }
     writer.close();
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     reader.loadFileInfo();
     reader.loadBloomfilter();
 
@@ -684,8 +693,11 @@ public class TestHStoreFile extends HBaseTestCase {
     writeStoreFile(writer);
     writer.close();
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
 
     // Now do reseek with empty KV to position to the beginning of the file
 
@@ -744,8 +756,16 @@ public class TestHStoreFile extends HBaseTestCase {
       }
       writer.close();
 
+      ReaderContext context = new ReaderContextBuilder()
+          .withFilePath(f)
+          .withFileSize(fs.getFileStatus(f).getLen())
+          .withFileSystem(fs)
+          .withInputStreamWrapper(new FSDataInputStreamWrapper(fs, f))
+          .build();
+      HFileInfo fileInfo = new HFileInfo(context, conf);
       StoreFileReader reader =
-          new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+          new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+      fileInfo.initMetaAndIndex(reader.getHFileReader());
       reader.loadFileInfo();
       reader.loadBloomfilter();
       StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java
index 0fd5d23..c0e47a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -191,8 +194,11 @@ public class TestRowPrefixBloomFilter {
     writeStoreFile(f, bt, expKeys);
 
     // read the file
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     reader.loadFileInfo();
     reader.loadBloomfilter();
 
@@ -259,8 +265,11 @@ public class TestRowPrefixBloomFilter {
     Path f = new Path(testDir, name.getMethodName());
     writeStoreFile(f, bt, expKeys);
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     reader.loadFileInfo();
     reader.loadBloomfilter();
 
@@ -309,8 +318,11 @@ public class TestRowPrefixBloomFilter {
     Path f = new Path(testDir, name.getMethodName());
     writeStoreFile(f, bt, expKeys);
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     reader.loadFileInfo();
     reader.loadBloomfilter();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
index bfef9bb..ea2d473 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileInfo.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,16 +31,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
 /**
  * Test HStoreFile
  */
@@ -103,9 +104,10 @@ public class TestStoreFileInfo {
     // Try to open nonsense hfilelink. Make sure exception is from HFileLink.
     Path p = new Path("/hbase/test/0123/cf/testtb=4567-abcd");
     try (FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration())) {
-      StoreFileInfo sfi = new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p);
+      StoreFileInfo sfi = new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p, true);
       try {
-        sfi.open(fs, null, false, 1000, true, new AtomicInteger(), false);
+        ReaderContext context = sfi.createReaderContext(false, 1000, ReaderType.PREAD);
+        sfi.createReader(context, null);
         throw new IllegalStateException();
       } catch (FileNotFoundException fnfe) {
         assertTrue(fnfe.getMessage().contains(HFileLink.class.getSimpleName()));
@@ -122,9 +124,10 @@ public class TestStoreFileInfo {
     fs.mkdirs(p.getParent());
     Reference r = Reference.createBottomReference(HConstants.EMPTY_START_ROW);
     r.write(fs, p);
-    StoreFileInfo sfi = new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p);
+    StoreFileInfo sfi = new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p, true);
     try {
-      sfi.open(fs, null, false, 1000, true, new AtomicInteger(), false);
+      ReaderContext context = sfi.createReaderContext(false, 1000, ReaderType.PREAD);
+      sfi.createReader(context, null);
       throw new IllegalStateException();
     } catch (FileNotFoundException fnfe) {
       assertTrue(fnfe.getMessage().contains("->"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
index 264638d..f1cb1c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext;
+import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -79,8 +82,11 @@ public class TestStoreFileScannerWithTagCompression {
     writeStoreFile(writer);
     writer.close();
 
+    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, f).build();
+    HFileInfo fileInfo = new HFileInfo(context, conf);
     StoreFileReader reader =
-        new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf);
+        new StoreFileReader(context, fileInfo, cacheConf, new AtomicInteger(0), conf);
+    fileInfo.initMetaAndIndex(reader.getHFileReader());
     StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false);
     try {
       // Now do reseek with empty KV to position to the beginning of the file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index f0372f6..c64cc88a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -108,7 +109,8 @@ public class TestSwitchToStreamRead {
         if (kvs instanceof StoreFileScanner) {
           StoreFileScanner sfScanner = (StoreFileScanner) kvs;
           // starting from pread so we use shared reader here.
-          assertTrue(sfScanner.getReader().shared);
+          assertTrue(sfScanner.getReader().getReaderContext()
+            .getReaderType() == ReaderType.PREAD);
         }
       }
       List<Cell> cells = new ArrayList<>();
@@ -123,7 +125,8 @@ public class TestSwitchToStreamRead {
         if (kvs instanceof StoreFileScanner) {
           StoreFileScanner sfScanner = (StoreFileScanner) kvs;
           // we should have convert to use stream read now.
-          assertFalse(sfScanner.getReader().shared);
+          assertFalse(sfScanner.getReader().getReaderContext()
+            .getReaderType() == ReaderType.PREAD);
         }
       }
       for (int i = 500; i < 1000; i++) {
@@ -156,7 +159,8 @@ public class TestSwitchToStreamRead {
         if (kvs instanceof StoreFileScanner) {
           StoreFileScanner sfScanner = (StoreFileScanner) kvs;
           // starting from pread so we use shared reader here.
-          assertTrue(sfScanner.getReader().shared);
+          assertTrue(sfScanner.getReader().getReaderContext()
+            .getReaderType() == ReaderType.PREAD);
         }
       }
       List<Cell> cells = new ArrayList<>();
@@ -170,7 +174,8 @@ public class TestSwitchToStreamRead {
         if (kvs instanceof StoreFileScanner) {
           StoreFileScanner sfScanner = (StoreFileScanner) kvs;
           // we should have convert to use stream read now.
-          assertFalse(sfScanner.getReader().shared);
+          assertFalse(sfScanner.getReader().getReaderContext()
+            .getReaderType() == ReaderType.PREAD);
         }
       }
       assertFalse(scanner.next(cells,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
index 129823e..badfc2a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -612,8 +612,7 @@ public class TestLoadIncrementalHFiles {
   private int verifyHFile(Path p) throws IOException {
     Configuration conf = util.getConfiguration();
     HFile.Reader reader =
-        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
+      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
     HFileScanner scanner = reader.getScanner(false, false);
     scanner.seekTo();
     int count = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 608039e..73a98f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -160,7 +160,6 @@ public class TestHBaseFsckEncryption {
     HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), path,
       new CacheConfig(conf), true, conf);
     try {
-      reader.loadFileInfo();
       Encryption.Context cryptoContext = reader.getFileContext().getEncryptionContext();
       assertNotNull("Reader has a null crypto context", cryptoContext);
       Key key = cryptoContext.getKey();