You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/11 03:29:10 UTC

hbase git commit: HBASE-17917 Use pread by default for all user scan

Repository: hbase
Updated Branches:
  refs/heads/master c5cc81d8e -> 0ae0edcd6


HBASE-17917 Use pread by default for all user scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ae0edcd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ae0edcd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ae0edcd

Branch: refs/heads/master
Commit: 0ae0edcd630aa1dcb6c47ea11fa4367ae0a5baa8
Parents: c5cc81d
Author: zhangduo <zh...@apache.org>
Authored: Wed May 10 14:15:44 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 11 11:28:22 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  70 ++---
 .../hadoop/hbase/regionserver/KeyValueHeap.java |   3 +
 .../hbase/regionserver/KeyValueScanner.java     |   7 +
 .../regionserver/NonLazyKeyValueScanner.java    |   9 +
 .../regionserver/ReversedStoreScanner.java      |  12 +-
 .../hadoop/hbase/regionserver/ScanInfo.java     |  17 +-
 .../hbase/regionserver/SegmentScanner.java      |   7 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |  14 +-
 .../hbase/regionserver/StoreFileInfo.java       |  15 +
 .../hbase/regionserver/StoreFileReader.java     |   3 +-
 .../hbase/regionserver/StoreFileScanner.java    |  42 +--
 .../hadoop/hbase/regionserver/StoreScanner.java | 279 ++++++++++++-------
 .../org/apache/hadoop/hbase/TestIOFencing.java  |  79 +++---
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   |   3 +-
 .../regionserver/DelegatingKeyValueScanner.java |   6 +
 .../hbase/regionserver/MockStoreFile.java       |  13 +-
 .../hbase/regionserver/TestBlocksScanned.java   |   6 +-
 .../regionserver/TestCompactingMemStore.java    |   2 +-
 .../hbase/regionserver/TestCompaction.java      |   6 +-
 .../TestDefaultCompactSelection.java            |   4 +-
 .../hbase/regionserver/TestDefaultMemStore.java |   7 +-
 .../hbase/regionserver/TestMajorCompaction.java |   6 +-
 .../regionserver/TestReversibleScanners.java    |   5 +-
 .../hbase/regionserver/TestStoreScanner.java    |  13 +-
 .../regionserver/TestSwitchToStreamRead.java    | 127 +++++++++
 .../TestCompactionScanQueryMatcher.java         |   4 +-
 .../querymatcher/TestUserScanQueryMatcher.java  |  23 +-
 .../hbase/util/TestCoprocessorScanPolicy.java   |  21 +-
 28 files changed, 541 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 445dc86..1e86b0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -24,8 +27,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,9 +54,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.IOUtils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 /**
  * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches.
  * Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since
@@ -1418,7 +1416,7 @@ public class HFileBlock implements Cacheable {
   static class FSReaderImpl implements FSReader {
     /** The file system stream of the underlying {@link HFile} that
      * does or doesn't do checksum validations in the filesystem */
-    protected FSDataInputStreamWrapper streamWrapper;
+    private FSDataInputStreamWrapper streamWrapper;
 
     private HFileBlockDecodingContext encodedBlockDecodingCtx;
 
@@ -1434,22 +1432,18 @@ public class HFileBlock implements Cacheable {
     private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
 
     /** The size of the file we are reading from, or -1 if unknown. */
-    protected long fileSize;
+    private long fileSize;
 
     /** The size of the header */
+    @VisibleForTesting
     protected final int hdrSize;
 
     /** The filesystem used to access data */
-    protected HFileSystem hfs;
-
-    private final Lock streamLock = new ReentrantLock();
+    private HFileSystem hfs;
 
-    /** The default buffer size for our buffered streams */
-    public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
-
-    protected HFileContext fileContext;
+    private HFileContext fileContext;
     // Cache the fileName
-    protected String pathName;
+    private String pathName;
 
     FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
         HFileContext fileContext) throws IOException {
@@ -1524,39 +1518,33 @@ public class HFileBlock implements Cacheable {
      *         next header
      * @throws IOException
      */
-    protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
+    @VisibleForTesting
+    protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
       if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
         // We are asked to read the next block's header as well, but there is
         // not enough room in the array.
-        throw new IOException("Attempted to read " + size + " bytes and " +
-            hdrSize + " bytes of next header into a " + dest.length +
-            "-byte array at offset " + destOffset);
+        throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
+            " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
       }
 
-      if (!pread && streamLock.tryLock()) {
+      if (!pread) {
         // Seek + read. Better for scanning.
-        try {
-          HFileUtil.seekOnMultipleSources(istream, fileOffset);
-
-          long realOffset = istream.getPos();
-          if (realOffset != fileOffset) {
-            throw new IOException("Tried to seek to " + fileOffset + " to "
-                + "read " + size + " bytes, but pos=" + realOffset
-                + " after seek");
-          }
+        HFileUtil.seekOnMultipleSources(istream, fileOffset);
+        long realOffset = istream.getPos();
+        if (realOffset != fileOffset) {
+          throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
+              " bytes, but pos=" + realOffset + " after seek");
+        }
 
-          if (!peekIntoNextBlock) {
-            IOUtils.readFully(istream, dest, destOffset, size);
-            return -1;
-          }
+        if (!peekIntoNextBlock) {
+          IOUtils.readFully(istream, dest, destOffset, size);
+          return -1;
+        }
 
-          // Try to read the next block header.
-          if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
-            return -1;
-          }
-        } finally {
-          streamLock.unlock();
+        // Try to read the next block header.
+        if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
+          return -1;
         }
       } else {
         // Positional read. Better for random reads; or when the streamLock is already locked.
@@ -1565,7 +1553,6 @@ public class HFileBlock implements Cacheable {
           return -1;
         }
       }
-
       assert peekIntoNextBlock;
       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
     }
@@ -1719,6 +1706,7 @@ public class HFileBlock implements Cacheable {
      *        If HBase checksum is switched off, then use HDFS checksum.
      * @return the HFileBlock or null if there is a HBase checksum mismatch
      */
+    @VisibleForTesting
     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
         long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
      throws IOException {
@@ -1830,7 +1818,7 @@ public class HFileBlock implements Cacheable {
      * If the block doesn't uses checksum, returns false.
      * @return True if checksum matches, else false.
      */
-    protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
+    private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
         throws IOException {
       // If this is an older version of the block that does not have checksums, then return false
       // indicating that checksum verification did not succeed. Actually, this method should never

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 195e8f7..a398ce9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -422,6 +424,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
     return 0;
   }
 
+  @VisibleForTesting
   KeyValueScanner getCurrentForTesting() {
     return current;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index a4cb2f4..7f716d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
@@ -134,6 +135,12 @@ public interface KeyValueScanner extends Shipper, Closeable {
    */
   boolean isFileScanner();
 
+  /**
+   * @return the file path if this is a file scanner, otherwise null.
+   * @see #isFileScanner()
+   */
+  Path getFilePath();
+
   // Support for "Reversed Scanner"
   /**
    * Seek the scanner at or before the row of specified Cell, it firstly

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
index c01b0e6..8778f5d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 
@@ -65,6 +66,14 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
     // Not a file by default.
     return false;
   }
+
+
+  @Override
+  public Path getFilePath() {
+    // Not a file by default.
+    return null;
+  }
+
   @Override
   public Cell getNextIndexedKey() {
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index d71af2b..07f98ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,15 +123,17 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
 
   @Override
   public boolean seekToPreviousRow(Cell key) throws IOException {
-    boolean flushed = checkFlushed();
-    checkReseek(flushed);
+    if (checkFlushed()) {
+      reopenAfterFlush();
+    }
     return this.heap.seekToPreviousRow(key);
   }
-  
+
   @Override
   public boolean backwardSeek(Cell key) throws IOException {
-    boolean flushed = checkFlushed();
-    checkReseek(flushed);
+    if (checkFlushed()) {
+      reopenAfterFlush();
+    }
     return this.heap.backwardSeek(key);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 349e166..2a66e55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -48,6 +48,7 @@ public class ScanInfo {
   private boolean usePread;
   private long cellsPerTimeoutCheck;
   private boolean parallelSeekEnabled;
+  private final long preadMaxBytes;
   private final Configuration conf;
 
   public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@@ -58,14 +59,14 @@ public class ScanInfo {
    * @param conf
    * @param family {@link HColumnDescriptor} describing the column family
    * @param ttl Store's TTL (in ms)
-   * @param timeToPurgeDeletes duration in ms after which a delete marker can
-   *        be purged during a major compaction.
+   * @param timeToPurgeDeletes duration in ms after which a delete marker can be purged during a
+   *          major compaction.
    * @param comparator The store's comparator
    */
   public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl,
       final long timeToPurgeDeletes, final CellComparator comparator) {
-    this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
-        .getKeepDeletedCells(), timeToPurgeDeletes, comparator);
+    this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
+        family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator);
   }
 
   /**
@@ -74,6 +75,7 @@ public class ScanInfo {
    * @param minVersions Store's MIN_VERSIONS setting
    * @param maxVersions Store's VERSIONS setting
    * @param ttl Store's TTL (in ms)
+   * @param blockSize Store's block size
    * @param timeToPurgeDeletes duration in ms after which a delete marker can
    *        be purged during a major compaction.
    * @param keepDeletedCells Store's keepDeletedCells setting
@@ -81,7 +83,7 @@ public class ScanInfo {
    */
   public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
       final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
-      final long timeToPurgeDeletes, final CellComparator comparator) {
+      final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) {
     this.family = family;
     this.minVersions = minVersions;
     this.maxVersions = maxVersions;
@@ -99,6 +101,7 @@ public class ScanInfo {
         perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
     this.parallelSeekEnabled =
       conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
+    this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
     this.conf = conf;
   }
 
@@ -149,4 +152,8 @@ public class ScanInfo {
   public CellComparator getComparator() {
     return comparator;
   }
+
+  long getPreadMaxBytes() {
+    return preadMaxBytes;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 2727360..08ded88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.SortedSet;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -316,6 +317,11 @@ public class SegmentScanner implements KeyValueScanner {
     return false;
   }
 
+  @Override
+  public Path getFilePath() {
+    return null;
+  }
+
   /**
    * @return the next key in the index (the key to seek to the next block)
    *     if known, or null otherwise
@@ -396,5 +402,4 @@ public class SegmentScanner implements KeyValueScanner {
     }
     return (first != null ? first : second);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index c53fbf08..91ff97a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -589,11 +589,17 @@ public class StoreFile {
     return reader;
   }
 
+  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
+      boolean canOptimizeForNonNullColumn) {
+    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
+      canOptimizeForNonNullColumn);
+  }
+
   public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
-      boolean pread, boolean isCompaction, long readPt, long scannerOrder,
-      boolean canOptimizeForNonNullColumn) throws IOException {
-    return createStreamReader(canUseDropBehind).getStoreFileScanner(
-      cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
+      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
+      throws IOException {
+    return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false,
+      isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index c4754a8..0e99c74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -564,4 +564,19 @@ public class StoreFileInfo {
     hash = hash * 31 + ((link == null) ? 0 : link.hashCode());
     return  hash;
   }
+
+  /**
+   * Return the active file name that contains the real data.
+   * <p>
+   * For referenced hfile, we will return the name of the reference file as it will be used to
+   * construct the StoreFileReader. And for linked hfile, we will return the name of the file being
+   * linked.
+   */
+  public String getActiveFileName() {
+    if (reference != null || link == null) {
+      return initialPath.getName();
+    } else {
+      return HFileLink.getReferencedHFileName(initialPath.getName());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index b015ea5..ee7d132 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -76,7 +76,8 @@ public class StoreFileReader {
 
   // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
   // close the internal reader when readCompleted is called.
-  private final boolean shared;
+  @VisibleForTesting
+  final boolean shared;
 
   private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
     this.reader = reader;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index aa4f897..42c2af2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
@@ -95,25 +96,21 @@ public class StoreFileScanner implements KeyValueScanner {
   }
 
   /**
-   * Return an array of scanners corresponding to the given
-   * set of store files.
+   * Return an array of scanners corresponding to the given set of store files.
    */
-  public static List<StoreFileScanner> getScannersForStoreFiles(
-      Collection<StoreFile> files,
-      boolean cacheBlocks,
-      boolean usePread, long readPt) throws IOException {
-    return getScannersForStoreFiles(files, cacheBlocks,
-                                   usePread, false, false, readPt);
+  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
+      boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
+    return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
   }
 
   /**
    * Return an array of scanners corresponding to the given set of store files.
    */
-  public static List<StoreFileScanner> getScannersForStoreFiles(
-      Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-      boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
-    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
-        useDropBehind, null, readPt);
+  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
+      boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind,
+      long readPt) throws IOException {
+    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null,
+      readPt);
   }
 
   /**
@@ -126,11 +123,17 @@ public class StoreFileScanner implements KeyValueScanner {
     List<StoreFileScanner> scanners = new ArrayList<>(files.size());
     List<StoreFile> sortedFiles = new ArrayList<>(files);
     Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
+    boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false;
     for (int i = 0, n = sortedFiles.size(); i < n; i++) {
       StoreFile sf = sortedFiles.get(i);
       sf.initReader();
-      StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
-        isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
+      StoreFileScanner scanner;
+      if (usePread) {
+        scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
+      } else {
+        scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
+          canOptimizeForNonNullColumn);
+      }
       scanners.add(scanner);
     }
     return scanners;
@@ -148,8 +151,8 @@ public class StoreFileScanner implements KeyValueScanner {
     boolean succ = false;
     try {
       for (int i = 0, n = sortedFiles.size(); i < n; i++) {
-        scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
-          readPt, i, false));
+        scanners.add(
+          sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false));
       }
       succ = true;
     } finally {
@@ -444,6 +447,11 @@ public class StoreFileScanner implements KeyValueScanner {
     return true;
   }
 
+  @Override
+  public Path getFilePath() {
+    return reader.getHFileReader().getPath();
+  }
+
   // Test methods
   static final long getSeekCount() {
     return seekCount.sum();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d39a6ee..338a68c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
@@ -53,8 +55,12 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
- * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
- * into List&lt;KeyValue&gt; for a single row.
+ * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List&lt;KeyValue&gt;
+ * for a single row.
+ * <p>
+ * The implementation is not thread safe. So there will be no race between next and close. The only
+ * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
+ * is a flush.
  */
 @InterfaceAudience.Private
 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@@ -62,36 +68,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private static final Log LOG = LogFactory.getLog(StoreScanner.class);
   // In unit tests, the store could be null
   protected final Store store;
-  protected ScanQueryMatcher matcher;
+  private ScanQueryMatcher matcher;
   protected KeyValueHeap heap;
-  protected boolean cacheBlocks;
+  private boolean cacheBlocks;
 
-  protected long countPerRow = 0;
-  protected int storeLimit = -1;
-  protected int storeOffset = 0;
+  private long countPerRow = 0;
+  private int storeLimit = -1;
+  private int storeOffset = 0;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
-  // Doesnt need to be volatile because it's always accessed via synchronized methods
-  protected boolean closing = false;
-  protected final boolean get;
-  protected final boolean explicitColumnQuery;
-  protected final boolean useRowColBloom;
+  // Do not need to be volatile because it's always accessed via synchronized methods
+  private boolean closing = false;
+  private final boolean get;
+  private final boolean explicitColumnQuery;
+  private final boolean useRowColBloom;
   /**
    * A flag that enables StoreFileScanner parallel-seeking
    */
-  protected boolean parallelSeekEnabled = false;
-  protected ExecutorService executor;
-  protected final Scan scan;
-  protected final NavigableSet<byte[]> columns;
-  protected final long oldestUnexpiredTS;
-  protected final long now;
-  protected final int minVersions;
-  protected final long maxRowSize;
-  protected final long cellsPerHeartbeatCheck;
+  private boolean parallelSeekEnabled = false;
+  private ExecutorService executor;
+  private final Scan scan;
+  private final long oldestUnexpiredTS;
+  private final long now;
+  private final int minVersions;
+  private final long maxRowSize;
+  private final long cellsPerHeartbeatCheck;
 
   // Collects all the KVHeap that are eagerly getting closed during the
   // course of a scan
-  protected List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
+  private final List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
 
   /**
    * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@@ -100,14 +105,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private long kvsScanned = 0;
   private Cell prevCell = null;
 
+  private final long preadMaxBytes;
+  private long bytesRead;
+
   /** We don't ever expect to change this, the constant is just for clarity. */
   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
       "hbase.storescanner.parallel.seek.enable";
 
   /** Used during unit testing to ensure that lazy seek does save seek ops */
-  protected static boolean lazySeekEnabledGlobally =
-      LAZY_SEEK_ENABLED_BY_DEFAULT;
+  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
 
   /**
    * The number of cells scanned in between timeout checks. Specifying a larger value means that
@@ -122,19 +129,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    */
   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
 
-  // if heap == null and lastTop != null, you need to reseek given the key below
-  protected Cell lastTop = null;
+  /**
+   * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
+   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
+   * block size for this store.
+   */
+  public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
+
+  private final Scan.ReadType readType;
 
   // A flag whether use pread for scan
-  private final boolean scanUsePread;
+  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
+  private boolean scanUsePread;
   // Indicates whether there was flush during the course of the scan
-  protected volatile boolean flushed = false;
+  private volatile boolean flushed = false;
   // generally we get one file from a flush
-  protected List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
+  private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
   // The current list of scanners
-  protected List<KeyValueScanner> currentScanners = new ArrayList<>();
+  private final List<KeyValueScanner> currentScanners = new ArrayList<>();
   // flush update lock
-  private ReentrantLock flushLock = new ReentrantLock();
+  private final ReentrantLock flushLock = new ReentrantLock();
 
   protected final long readPt;
 
@@ -155,7 +169,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     int numCol = columns == null ? 0 : columns.size();
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
-    this.columns = columns;
     this.now = EnvironmentEdgeManager.currentTime();
     this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
     this.minVersions = scanInfo.getMinVersions();
@@ -168,32 +181,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
      this.maxRowSize = scanInfo.getTableMaxRowSize();
     if (get) {
+      this.readType = Scan.ReadType.PREAD;
       this.scanUsePread = true;
     } else {
-      switch (scan.getReadType()) {
-        case STREAM:
-          this.scanUsePread = false;
-          break;
-        case PREAD:
-          this.scanUsePread = true;
-          break;
-        default:
-          this.scanUsePread = scanInfo.isUsePread();
-          break;
+      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
+        this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
+      } else {
+        this.readType = scan.getReadType();
+      }
+      // Always start with pread unless user specific stream. Will change to stream later if
+      // readType is default if the scan keeps running for a long time.
+      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
+    }
+    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
+    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
+    // Parallel seeking is on if the config allows and more there is more than one store file.
+    if (this.store != null && this.store.getStorefilesCount() > 1) {
+      RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices();
+      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
+        this.parallelSeekEnabled = true;
+        this.executor = rsService.getExecutorService();
       }
     }
-     this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
-     // Parallel seeking is on if the config allows and more there is more than one store file.
-     if (this.store != null && this.store.getStorefilesCount() > 1) {
-       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
-       if (rsService != null && scanInfo.isParallelSeekEnabled()) {
-         this.parallelSeekEnabled = true;
-         this.executor = rsService.getExecutorService();
-       }
-     }
   }
 
-  protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
+  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
     this.currentScanners.addAll(scanners);
   }
 
@@ -360,9 +372,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * Get a filtered list of scanners. Assumes we are not in a compaction.
    * @return list of scanners to seek
    */
-  protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
-    return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher,
-      scan.getStartRow(), scan.getStopRow(), this.readPt));
+  private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
+    return selectScannersFrom(
+      store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
+        scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
   }
 
   /**
@@ -413,7 +426,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   /**
    * Filters the given list of scanners using Bloom filter, time range, and
    * TTL.
+   * <p>
+   * Will be overridden by testcase so declared as protected.
    */
+  @VisibleForTesting
   protected List<KeyValueScanner> selectScannersFrom(
       final List<? extends KeyValueScanner> allScanners) {
     boolean memOnly;
@@ -451,10 +467,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public Cell peek() {
-    if (this.heap == null) {
-      return this.lastTop;
-    }
-    return this.heap.peek();
+    return heap != null ? heap.peek() : null;
   }
 
   @Override
@@ -472,9 +485,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (this.closing) {
       return;
     }
-    if (withHeapClose) this.closing = true;
+    if (withHeapClose) {
+      this.closing = true;
+    }
     // Under test, we dont have a this.store
-    if (this.store != null) this.store.deleteChangedReaderObserver(this);
+    if (this.store != null) {
+      this.store.deleteChangedReaderObserver(this);
+    }
     if (withHeapClose) {
       for (KeyValueHeap h : this.heapsForDelayedClose) {
         h.close();
@@ -492,14 +509,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
         this.heap = null;
       }
     }
-    this.lastTop = null; // If both are null, we are closed.
   }
 
   @Override
   public boolean seek(Cell key) throws IOException {
-    boolean flushed = checkFlushed();
-    // reset matcher state, in case that underlying store changed
-    checkReseek(flushed);
+    if (checkFlushed()) {
+      reopenAfterFlush();
+    }
     return this.heap.seek(key);
   }
 
@@ -519,8 +535,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (scannerContext == null) {
       throw new IllegalArgumentException("Scanner context cannot be null");
     }
-    boolean flushed = checkFlushed();
-    if (checkReseek(flushed)) {
+    trySwitchToStreamRead();
+    if (checkFlushed() && reopenAfterFlush()) {
       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
 
@@ -550,7 +566,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     // Clear progress away unless invoker has indicated it should be kept.
-    if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
+    if (!scannerContext.getKeepProgress()) {
+      scannerContext.clearProgress();
+    }
 
     // Only do a sanity-check if store and comparator are available.
     CellComparator comparator = store != null ? store.getComparator() : null;
@@ -566,9 +584,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
         }
       }
-
-      if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
+      // Do object compare - we set prevKV from the same heap.
+      if (prevCell != cell) {
+        ++kvsScanned;
+      }
       checkScanOrder(prevCell, cell, comparator);
+      int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
+      bytesRead += cellSize;
       prevCell = cell;
       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
       switch (qcode) {
@@ -600,7 +622,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
             // Update local tracking information
             count++;
-            int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
             totalBytesRead += cellSize;
 
             // Update the progress of the scanner context
@@ -636,7 +657,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
         case DONE:
           // Optimization for Gets! If DONE, no more to get on this row, early exit!
-          if (this.scan.isGetScan()) {
+          if (get) {
             // Then no more to this row... exit.
             close(false);// Do all cleanup except heap.close()
             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
@@ -807,34 +828,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   }
 
   /**
-   * @param flushed indicates if there was a flush
-   * @return true if top of heap has changed (and KeyValueHeap has to try the
-   *         next KV)
-   * @throws IOException
+   * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
    */
-  protected boolean checkReseek(boolean flushed) throws IOException {
-    if (flushed && this.lastTop != null) {
-      resetScannerStack(this.lastTop);
-      if (this.heap.peek() == null
-          || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
-        LOG.debug("Storescanner.peek() is changed where before = "
-            + this.lastTop.toString() + ",and after = " + this.heap.peek());
-        this.lastTop = null;
-        return true;
-      }
-      this.lastTop = null; // gone!
-    }
-    // else dont need to reseek
-    return false;
-  }
-
-  protected void resetScannerStack(Cell lastTopKey) throws IOException {
+  protected final boolean reopenAfterFlush() throws IOException {
+    Cell lastTop = heap.peek();
     // When we have the scan object, should we not pass it to getScanners() to get a limited set of
     // scanners? We did so in the constructor and we could have done it now by storing the scan
     // object from the constructor
-    List<KeyValueScanner> scanners = null;
+    List<KeyValueScanner> scanners;
+    flushLock.lock();
     try {
-      flushLock.lock();
       scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
         scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
       // Clear the current set of flushed store files so that they don't get added again
@@ -844,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     // Seek the new scanners to the last key
-    seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
+    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
     // remove the older memstore scanner
     for (int i = 0; i < currentScanners.size(); i++) {
       if (!currentScanners.get(i).isFileScanner()) {
@@ -856,6 +859,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     addCurrentScanners(scanners);
     // Combine all seeked scanners with a heap
     resetKVHeap(this.currentScanners, store.getComparator());
+    resetQueryMatcher(lastTop);
+    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() +
+            ",and after = " + heap.peek());
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private void resetQueryMatcher(Cell lastTopKey) {
     // Reset the state of the Query Matcher and set to top row.
     // Only reset and call setRow if the row changes; avoids confusing the
     // query matcher if scanning intra-row.
@@ -902,18 +917,73 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public boolean reseek(Cell kv) throws IOException {
-    boolean flushed = checkFlushed();
-    // Heap will not be null, if this is called from next() which.
-    // If called from RegionScanner.reseek(...) make sure the scanner
-    // stack is reset if needed.
-    checkReseek(flushed);
+    if (checkFlushed()) {
+      reopenAfterFlush();
+    }
     if (explicitColumnQuery && lazySeekEnabledGlobally) {
       return heap.requestSeek(kv, true, useRowColBloom);
     }
     return heap.reseek(kv);
   }
 
-  protected boolean checkFlushed() {
+  private void trySwitchToStreamRead() throws IOException {
+    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
+        bytesRead < preadMaxBytes) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Switch to stream read because we have already read " + bytesRead +
+          " bytes from this scanner");
+    }
+    scanUsePread = false;
+    Cell lastTop = heap.peek();
+    Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
+    for (StoreFile file : store.getStorefiles()) {
+      name2File.put(file.getFileInfo().getActiveFileName(), file);
+    }
+    List<StoreFile> filesToReopen = new ArrayList<>();
+    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
+    List<KeyValueScanner> fileScanners = null;
+    List<KeyValueScanner> scannersToClose = new ArrayList<>();
+    boolean succ = false;
+    try {
+      for (KeyValueScanner kvs : currentScanners) {
+        if (!kvs.isFileScanner()) {
+          memstoreScanners.add(kvs);
+        } else {
+          scannersToClose.add(kvs);
+          if (kvs.peek() == null) {
+            continue;
+          }
+          filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
+        }
+      }
+      if (filesToReopen.isEmpty()) {
+        return;
+      }
+      fileScanners =
+          store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
+            scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
+      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
+      currentScanners.clear();
+      addCurrentScanners(fileScanners);
+      addCurrentScanners(memstoreScanners);
+      resetKVHeap(currentScanners, store.getComparator());
+      resetQueryMatcher(lastTop);
+      for (KeyValueScanner kvs : scannersToClose) {
+        kvs.close();
+      }
+      succ = true;
+    } finally {
+      if (!succ && fileScanners != null) {
+        for (KeyValueScanner scanner : fileScanners) {
+          scanner.close();
+        }
+      }
+    }
+  }
+
+  protected final boolean checkFlushed() {
     // check the var without any lock. Suppose even if we see the old
     // value here still it is ok to continue because we will not be resetting
     // the heap but will continue with the referenced memstore's snapshot. For compactions
@@ -922,9 +992,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (flushed) {
       // If there is a flush and the current scan is notified on the flush ensure that the
       // scan's heap gets reset and we do a seek on the newly flushed file.
-      if(!this.closing) {
-        this.lastTop = this.peek();
-      } else {
+      if (this.closing) {
         return false;
       }
       // reset the flag
@@ -983,6 +1051,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * Used in testing.
    * @return all scanners in no particular order
    */
+  @VisibleForTesting
   List<KeyValueScanner> getAllScannersForTesting() {
     List<KeyValueScanner> allScanners = new ArrayList<>();
     KeyValueScanner current = heap.getCurrentForTesting();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index f12a334..f65459f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,8 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -45,7 +47,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -53,28 +57,26 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /**
- * Test for the case where a regionserver going down has enough cycles to do damage to regions
- * that have actually been assigned elsehwere.
- *
- * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
- * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
- * change the region file set.  The region in its new location will then get a surprise when it tries to do something
- * w/ a file removed by the region in its old location on dying server.
- *
- * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
- * if the file was opened before the delete, it will continue to let reads happen until something changes the
- * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
- * from the datanode by NN).
- *
- * <p>What we will do below is do an explicit check for existence on the files listed in the region that
- * has had some files removed because of a compaction.  This sort of hurry's along and makes certain what is a chance
- * occurance.
+ * Test for the case where a regionserver going down has enough cycles to do damage to regions that
+ * have actually been assigned elsehwere.
+ * <p>
+ * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
+ * two servers at the same time -- all can work fine until the case where the region on the dying
+ * server decides to compact or otherwise change the region file set. The region in its new location
+ * will then get a surprise when it tries to do something w/ a file removed by the region in its old
+ * location on dying server.
+ * <p>
+ * Making a test for this case is a little tough in that even if a file is deleted up on the
+ * namenode, if the file was opened before the delete, it will continue to let reads happen until
+ * something changes the state of cached blocks in the dfsclient that was already open (a block from
+ * the deleted file is cleaned from the datanode by NN).
+ * <p>
+ * What we will do below is do an explicit check for existence on the files listed in the region
+ * that has had some files removed because of a compaction. This sort of hurry's along and makes
+ * certain what is a chance occurance.
  */
-@Category({MiscTests.class, MediumTests.class})
+@Category({MiscTests.class, LargeTests.class})
 public class TestIOFencing {
   private static final Log LOG = LogFactory.getLog(TestIOFencing.class);
   static {
@@ -334,23 +336,38 @@ public class TestIOFencing {
       while (compactingRegion.compactCount == 0) {
         Thread.sleep(1000);
       }
-      // The server we killed stays up until the compaction that was started before it was killed completes.  In logs
-      // you should see the old regionserver now going down.
+      // The server we killed stays up until the compaction that was started before it was killed
+      // completes. In logs you should see the old regionserver now going down.
       LOG.info("Compaction finished");
 
       // If we survive the split keep going...
       // Now we make sure that the region isn't totally confused.  Load up more rows.
-      TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
+      TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
+        FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
       admin.majorCompact(TABLE_NAME);
       startWaitTime = System.currentTimeMillis();
       while (newRegion.compactCount == 0) {
         Thread.sleep(1000);
-        assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
+        assertTrue("New region never compacted",
+          System.currentTimeMillis() - startWaitTime < 180000);
+      }
+      int count;
+      for (int i = 0;; i++) {
+        try {
+          count = TEST_UTIL.countRows(table);
+          break;
+        } catch (DoNotRetryIOException e) {
+          // wait up to 30s
+          if (i >= 30 || !e.getMessage().contains("File does not exist")) {
+            throw e;
+          }
+          Thread.sleep(1000);
+        }
       }
-      if(policy == MemoryCompactionPolicy.EAGER) {
-        assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= TEST_UTIL.countRows(table));
+      if (policy == MemoryCompactionPolicy.EAGER) {
+        assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
       } else {
-        assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
+        assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
       }
     } finally {
       if (compactingRegion != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 68c4587..cb1c932 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -671,7 +671,8 @@ public class TestHFileBlock {
       while (System.currentTimeMillis() < endTime) {
         int blockId = rand.nextInt(NUM_TEST_BLOCKS);
         long offset = offsets.get(blockId);
-        boolean pread = rand.nextBoolean();
+        // now we only support concurrent read with pread = true
+        boolean pread = true;
         boolean withOnDiskSize = rand.nextBoolean();
         long expectedSize =
           (blockId == NUM_TEST_BLOCKS - 1 ? fileSize

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
index 51a2a97..403d880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -93,6 +94,11 @@ public class DelegatingKeyValueScanner implements KeyValueScanner {
   }
 
   @Override
+  public Path getFilePath() {
+    return delegate.getFilePath();
+  }
+
+  @Override
   public boolean backwardSeek(Cell key) throws IOException {
     return delegate.backwardSeek(key);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index d52c6c7..91b85d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -145,10 +145,17 @@ public class MockStoreFile extends StoreFile {
   }
 
   @Override
+  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
+      boolean canOptimizeForNonNullColumn) {
+    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
+      canOptimizeForNonNullColumn);
+  }
+
+  @Override
   public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
-      boolean pread, boolean isCompaction, long readPt, long scannerOrder,
-      boolean canOptimizeForNonNullColumn) throws IOException {
-    return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder,
+      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
+      throws IOException {
+    return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
       canOptimizeForNonNullColumn);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
index 497fd03..c28e48b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
@@ -96,14 +96,14 @@ public class TestBlocksScanned extends HBaseTestCase {
     CacheStats stats = new CacheConfig(TEST_UTIL.getConfiguration()).getBlockCache().getStats();
     long before = stats.getHitCount() + stats.getMissCount();
     // Do simple test of getting one row only first.
-    Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
+    Scan scan = new Scan().withStartRow(Bytes.toBytes("aaa")).withStopRow(Bytes.toBytes("aaz"))
+        .setReadType(Scan.ReadType.PREAD);
     scan.addColumn(FAMILY, COL);
     scan.setMaxVersions(1);
 
     InternalScanner s = r.getScanner(scan);
     List<Cell> results = new ArrayList<>();
-    while (s.next(results))
-      ;
+    while (s.next(results));
     s.close();
 
     int expectResultSize = 'z' - 'a';

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 9e90f3e..04435db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -192,7 +192,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-        KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
       ScanType scanType = ScanType.USER_SCAN;
       InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 1bf6ea7..5f4c0aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -212,9 +212,9 @@ public class TestCompaction {
       for (Store hstore: this.r.stores.values()) {
         HStore store = (HStore)hstore;
         ScanInfo old = store.getScanInfo();
-        ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
-            old.getMinVersions(), old.getMaxVersions(), ttl,
-            old.getKeepDeletedCells(), 0, old.getComparator());
+        ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
+            old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
+            old.getComparator());
         store.setScanInfo(si);
       }
       Thread.sleep(ttl);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index 3c41fc5..584285b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -160,8 +160,8 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
     ScanInfo oldScanInfo = store.getScanInfo();
     ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
         oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
-        oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(),
-        oldScanInfo.getComparator());
+        oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
+        oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator());
     store.setScanInfo(newScanInfo);
     // Do not compact empty store file
     List<StoreFile> candidates = sfCreate(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 3acb48b..3b15ff3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -162,9 +162,8 @@ public class TestDefaultMemStore {
     Scan scan = new Scan();
     List<Cell> result = new ArrayList<>();
     Configuration conf = HBaseConfiguration.create();
-    ScanInfo scanInfo =
-        new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
-            this.memstore.getComparator());
+    ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP,
+        KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
     ScanType scanType = ScanType.USER_SCAN;
     StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     int count = 0;
@@ -602,7 +601,7 @@ public class TestDefaultMemStore {
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-          KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
       ScanType scanType = ScanType.USER_SCAN;
       try (InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index 9d00d38..0b35f95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -292,9 +292,9 @@ public class TestMajorCompaction {
     for (Store hstore : r.getStores()) {
       HStore store = ((HStore) hstore);
       ScanInfo old = store.getScanInfo();
-      ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
-          old.getMinVersions(), old.getMaxVersions(), ttl,
-          old.getKeepDeletedCells(), 0, old.getComparator());
+      ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
+          old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
+          old.getComparator());
       store.setScanInfo(si);
     }
     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index c1fd6a3..2dfdf5b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -264,8 +264,9 @@ public class TestReversibleScanners {
         BloomType.NONE, true);
 
     ScanType scanType = ScanType.USER_SCAN;
-    ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
-        Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
+    ScanInfo scanInfo =
+        new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
+            KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
 
     // Case 1.Test a full reversed scan
     Scan scan = new Scan();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 3e2949c..524af34 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -67,8 +67,8 @@ public class TestStoreScanner {
   private static final String CF_STR = "cf";
   private static final byte [] CF = Bytes.toBytes(CF_STR);
   static Configuration CONF = HBaseConfiguration.create();
-  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
-      Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
+  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
+      KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
   private ScanType scanType = ScanType.USER_SCAN;
 
   /**
@@ -829,8 +829,8 @@ public class TestStoreScanner {
     List<KeyValueScanner> scanners = scanFixture(kvs);
     Scan scan = new Scan();
     scan.setMaxVersions(1);
-    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
-        CellComparator.COMPARATOR);
+    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
+        HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
     ScanType scanType = ScanType.USER_SCAN;
     try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
       List<Cell> results = new ArrayList<>();
@@ -902,8 +902,8 @@ public class TestStoreScanner {
     Scan scan = new Scan();
     scan.setMaxVersions(1);
     // scanner with ttl equal to 500
-    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
-        CellComparator.COMPARATOR);
+    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
+        HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
     ScanType scanType = ScanType.USER_SCAN;
     try (StoreScanner scanner =
         new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
@@ -968,6 +968,7 @@ public class TestStoreScanner {
         0 /* minVersions */,
         2 /* maxVersions */, 500 /* ttl */,
         KeepDeletedCells.FALSE /* keepDeletedCells */,
+        HConstants.DEFAULT_BLOCKSIZE /* block size */,
         200, /* timeToPurgeDeletes */
         CellComparator.COMPARATOR);
       try (StoreScanner scanner =

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
new file mode 100644
index 0000000..fb978b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestSwitchToStreamRead {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("stream");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUAL = Bytes.toBytes("cq");
+
+  private static String VALUE_PREFIX;
+
+  private static HRegion REGION;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
+    StringBuilder sb = new StringBuilder(256);
+    for (int i = 0; i < 255; i++) {
+      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
+    }
+    VALUE_PREFIX = sb.append("-").toString();
+    REGION = UTIL.createLocalHRegion(
+      new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY).setBlocksize(1024)),
+      null, null);
+    for (int i = 0; i < 900; i++) {
+      REGION
+          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+    }
+    REGION.flush(true);
+    for (int i = 900; i < 1000; i++) {
+      REGION
+          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    REGION.close(true);
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void test() throws IOException {
+    try (RegionScanner scanner = REGION.getScanner(new Scan())) {
+      StoreScanner storeScanner = (StoreScanner) ((RegionScannerImpl) scanner)
+          .getStoreHeapForTesting().getCurrentForTesting();
+      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+        if (kvs instanceof StoreFileScanner) {
+          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+          // starting from pread so we use shared reader here.
+          assertTrue(sfScanner.getReader().shared);
+        }
+      }
+      List<Cell> cells = new ArrayList<>();
+      for (int i = 0; i < 500; i++) {
+        assertTrue(scanner.next(cells));
+        Result result = Result.create(cells);
+        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
+        cells.clear();
+      }
+      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+        if (kvs instanceof StoreFileScanner) {
+          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+          // we should have convert to use stream read now.
+          assertFalse(sfScanner.getReader().shared);
+        }
+      }
+      for (int i = 500; i < 1000; i++) {
+        assertEquals(i != 999, scanner.next(cells));
+        Result result = Result.create(cells);
+        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
+        cells.clear();
+      }
+    }
+    // make sure all scanners are closed.
+    for (StoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
+      assertFalse(sf.isReferencedInReads());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
index af8c27d..73c92e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
@@ -73,8 +73,8 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher
       throws IOException {
     long now = EnvironmentEdgeManager.currentTime();
     // Set time to purge deletes to negative value to avoid it ever happening.
-    ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L,
-        rowComparator);
+    ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
+        HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator);
 
     CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo,
       ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP,

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
index b4e4311..f3cf604 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
@@ -54,7 +54,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
     long now = EnvironmentEdgeManager.currentTime();
     // Do with fam2 which has a col2 qualifier.
     UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
-      new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+      new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE,
+          HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
       get.getFamilyMap().get(fam2), now - ttl, now, null);
     Cell kv = new KeyValue(row1, fam2, col2, 1, data);
     Cell cell = CellUtil.createLastOnRowCol(kv);
@@ -79,8 +80,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
 
     long now = EnvironmentEdgeManager.currentTime();
     // 2,4,5
-    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
-      new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(
+      scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
+          HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
       get.getFamilyMap().get(fam2), now - ttl, now, null);
 
     List<KeyValue> memstore = new ArrayList<>(6);
@@ -122,9 +124,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
     expected.add(ScanQueryMatcher.MatchCode.DONE);
 
     long now = EnvironmentEdgeManager.currentTime();
-    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
-      new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
-      now - ttl, now, null);
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
+        ttl, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
+      null, now - ttl, now, null);
 
     List<KeyValue> memstore = new ArrayList<>(6);
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -168,7 +170,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
 
     long now = EnvironmentEdgeManager.currentTime();
     UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
-      new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator),
+      new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE,
+          HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
       get.getFamilyMap().get(fam2), now - testTTL, now, null);
 
     KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
@@ -209,9 +212,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
         ScanQueryMatcher.MatchCode.DONE };
 
     long now = EnvironmentEdgeManager.currentTime();
-    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
-      new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
-      now - testTTL, now, null);
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
+        testTTL, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
+      null, now - testTTL, now, null);
 
     KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
         new KeyValue(row1, fam2, col2, now - 50, data),

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae0edcd/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index 27e93a0..720ad29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -245,11 +245,10 @@ public class TestCoprocessorScanPolicy {
       Integer newVersions = versions.get(store.getTableName());
       ScanInfo oldSI = store.getScanInfo();
       HColumnDescriptor family = store.getFamily();
-      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
-          family.getName(), family.getMinVersions(),
-          newVersions == null ? family.getMaxVersions() : newVersions,
+      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+          family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
-          oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+          family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
       Scan scan = new Scan();
       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
       return new StoreScanner(store, scanInfo, scan, scanners,
@@ -266,11 +265,10 @@ public class TestCoprocessorScanPolicy {
       Integer newVersions = versions.get(store.getTableName());
       ScanInfo oldSI = store.getScanInfo();
       HColumnDescriptor family = store.getFamily();
-      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
-          family.getName(), family.getMinVersions(),
-          newVersions == null ? family.getMaxVersions() : newVersions,
+      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+          family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
-          oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+          family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
       Scan scan = new Scan();
       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
       return new StoreScanner(store, scanInfo, scan, scanners, scanType,
@@ -287,11 +285,10 @@ public class TestCoprocessorScanPolicy {
         Integer newVersions = versions.get(store.getTableName());
         ScanInfo oldSI = store.getScanInfo();
         HColumnDescriptor family = store.getFamily();
-        ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
-            family.getName(), family.getMinVersions(),
-            newVersions == null ? family.getMaxVersions() : newVersions,
+        ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
+            family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
             newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
-            oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+            family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
         return new StoreScanner(store, scanInfo, scan, targetCols, readPt);
       } else {
         return s;