You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:06 UTC

[hbase] 04/22: HBASE-22016 Rewrite the block reading methods by using hbase.nio.ByteBuff

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7698baea1f4a8038693d4676091c6872b26b5d2e
Author: huzheng <op...@gmail.com>
AuthorDate: Fri Mar 8 16:46:06 2019 +0800

    HBASE-22016 Rewrite the block reading methods by using hbase.nio.ByteBuff
---
 .../apache/hadoop/hbase/io/hfile/BlockIOUtils.java | 223 ++++++++++++++
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 337 ++++++++-------------
 ...ckPositionalRead.java => TestBlockIOUtils.java} | 122 ++++++--
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |  14 +-
 4 files changed, 453 insertions(+), 243 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
new file mode 100644
index 0000000..dbd5b2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class BlockIOUtils {
+
+  static boolean isByteBufferReadable(FSDataInputStream is) {
+    InputStream cur = is.getWrappedStream();
+    for (;;) {
+      if ((cur instanceof FSDataInputStream)) {
+        cur = ((FSDataInputStream) cur).getWrappedStream();
+      } else {
+        break;
+      }
+    }
+    return cur instanceof ByteBufferReadable;
+  }
+
+  /**
+   * Read length bytes into ByteBuffers directly.
+   * @param buf the destination {@link ByteBuff}
+   * @param dis the HDFS input stream which implement the ByteBufferReadable interface.
+   * @param length bytes to read.
+   * @throws IOException exception to throw if any error happen
+   */
+  static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
+    if (!isByteBufferReadable(dis)) {
+      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
+      // the destination ByteBuff.
+      byte[] heapBuf = new byte[length];
+      IOUtils.readFully(dis, heapBuf, 0, length);
+      copyToByteBuff(heapBuf, 0, length, buf);
+      return;
+    }
+    ByteBuffer[] buffers = buf.nioByteBuffers();
+    int remain = length;
+    int idx = 0;
+    ByteBuffer cur = buffers[idx];
+    while (remain > 0) {
+      while (!cur.hasRemaining()) {
+        if (++idx >= buffers.length) {
+          throw new IOException(
+              "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
+        }
+        cur = buffers[idx];
+      }
+      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+      int bytesRead = dis.read(cur);
+      if (bytesRead < 0) {
+        throw new IOException(
+            "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
+      }
+      remain -= bytesRead;
+    }
+  }
+
+  /**
+   * Read from an input stream at least <code>necessaryLen</code> and if possible,
+   * <code>extraLen</code> also if available. Analogous to
+   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
+   * bytes to also optionally read.
+   * @param in the input stream to read from
+   * @param buf the buffer to read into
+   * @param bufOffset the destination offset in the buffer
+   * @param necessaryLen the number of bytes that are absolutely necessary to read
+   * @param extraLen the number of extra bytes that would be nice to read
+   * @return true if succeeded reading the extra bytes
+   * @throws IOException if failed to read the necessary bytes
+   */
+  private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
+      int necessaryLen, int extraLen) throws IOException {
+    int bytesRemaining = necessaryLen + extraLen;
+    while (bytesRemaining > 0) {
+      int ret = in.read(buf, bufOffset, bytesRemaining);
+      if (ret < 0) {
+        if (bytesRemaining <= extraLen) {
+          // We could not read the "extra data", but that is OK.
+          break;
+        }
+        throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
+            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+            + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
+      }
+      bufOffset += ret;
+      bytesRemaining -= ret;
+    }
+    return bytesRemaining <= 0;
+  }
+
+  /**
+   * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
+   * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
+   * @param buf the destination {@link ByteBuff}.
+   * @param dis input stream to read.
+   * @param necessaryLen bytes which we must read
+   * @param extraLen bytes which we may read
+   * @return if the returned flag is true, then we've finished to read the extraLen into our
+   *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
+   * @throws IOException if failed to read the necessary bytes.
+   */
+  static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen)
+      throws IOException {
+    if (!isByteBufferReadable(dis)) {
+      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
+      // the destination ByteBuff.
+      byte[] heapBuf = new byte[necessaryLen + extraLen];
+      boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
+      copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
+      return ret;
+    }
+    ByteBuffer[] buffers = buf.nioByteBuffers();
+    int bytesRead = 0;
+    int remain = necessaryLen + extraLen;
+    int idx = 0;
+    ByteBuffer cur = buffers[idx];
+    while (bytesRead < necessaryLen) {
+      while (!cur.hasRemaining()) {
+        if (++idx >= buffers.length) {
+          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
+        }
+        cur = buffers[idx];
+      }
+      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+      int ret = dis.read(cur);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream (read returned " + ret
+            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+            + " extra bytes, successfully read " + bytesRead);
+      }
+      bytesRead += ret;
+      remain -= ret;
+    }
+    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
+  }
+
+  /**
+   * Read from an input stream at least <code>necessaryLen</code> and if possible,
+   * <code>extraLen</code> also if available. Analogous to
+   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
+   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
+   * read.
+   * @param buff ByteBuff to read into.
+   * @param dis the input stream to read from
+   * @param position the position within the stream from which to start reading
+   * @param necessaryLen the number of bytes that are absolutely necessary to read
+   * @param extraLen the number of extra bytes that would be nice to read
+   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
+   * @throws IOException if failed to read the necessary bytes
+   */
+  static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
+      int necessaryLen, int extraLen) throws IOException {
+    int remain = necessaryLen + extraLen;
+    byte[] buf = new byte[remain];
+    int bytesRead = 0;
+    while (bytesRead < necessaryLen) {
+      int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+            + " extra bytes, successfully read " + bytesRead);
+      }
+      bytesRead += ret;
+      remain -= ret;
+    }
+    // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
+    // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
+    // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
+    // preadWithExtra method for the upper layer, only need to refactor this method if the
+    // ByteBuffer pread is OK.
+    copyToByteBuff(buf, 0, bytesRead, buff);
+    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
+  }
+
+  private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
+      throws IOException {
+    if (offset < 0 || len < 0 || offset + len > buf.length) {
+      throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
+    }
+    ByteBuffer[] buffers = out.nioByteBuffers();
+    int idx = 0, remain = len, copyLen;
+    ByteBuffer cur = buffers[idx];
+    while (remain > 0) {
+      while (!cur.hasRemaining()) {
+        if (++idx >= buffers.length) {
+          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
+        }
+        cur = buffers[idx];
+      }
+      copyLen = Math.min(cur.remaining(), remain);
+      cur.put(buf, offset, copyLen);
+      remain -= copyLen;
+      offset += copyLen;
+    }
+    return len;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 91e63fd..4773678 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.IOUtils;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -280,9 +278,7 @@ public class HFileBlock implements Cacheable {
       boolean usesChecksum = buf.get() == (byte) 1;
       long offset = buf.getLong();
       int nextBlockOnDiskSize = buf.getInt();
-      HFileBlock hFileBlock =
-          new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
-      return hFileBlock;
+      return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
     }
 
     @Override
@@ -315,9 +311,9 @@ public class HFileBlock implements Cacheable {
    * param.
    */
   private HFileBlock(HFileBlock that, boolean bufCopy) {
-    init(that.blockType, that.onDiskSizeWithoutHeader,
-        that.uncompressedSizeWithoutHeader, that.prevBlockOffset,
-        that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext);
+    init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
+      that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
+      that.fileContext);
     if (bufCopy) {
       this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
     } else {
@@ -331,6 +327,7 @@ public class HFileBlock implements Cacheable {
    * and is sitting in a byte buffer and we want to stuff the block into cache.
    *
    * <p>TODO: The caller presumes no checksumming
+   * <p>TODO: HFile block writer can also off-heap ? </p>
    * required of this block instance since going into cache; checksum already verified on
    * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
    *
@@ -349,8 +346,8 @@ public class HFileBlock implements Cacheable {
       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
       long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
       HFileContext fileContext) {
-    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
-        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
+    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
+      onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
     this.buf = new SingleByteBuff(b);
     if (fillHeader) {
       overwriteHeader();
@@ -366,7 +363,8 @@ public class HFileBlock implements Cacheable {
    * @param buf Has header, content, and trailing checksums if present.
    */
   HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
-      final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
+      final int nextBlockOnDiskSize, HFileContext fileContext)
+      throws IOException {
     buf.rewind();
     final BlockType blockType = BlockType.read(buf);
     final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
@@ -394,8 +392,8 @@ public class HFileBlock implements Cacheable {
     }
     fileContext = fileContextBuilder.build();
     assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
-    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
-        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
+    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
+      onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
     this.memType = memType;
     this.offset = offset;
     this.buf = buf;
@@ -406,9 +404,8 @@ public class HFileBlock implements Cacheable {
    * Called from constructors.
    */
   private void init(BlockType blockType, int onDiskSizeWithoutHeader,
-      int uncompressedSizeWithoutHeader, long prevBlockOffset,
-      long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize,
-      HFileContext fileContext) {
+      int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
+      int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext) {
     this.blockType = blockType;
     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -425,10 +422,9 @@ public class HFileBlock implements Cacheable {
    * @param verifyChecksum true if checksum verification is in use.
    * @return Size of the block with header included.
    */
-  private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf,
+  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
       boolean verifyChecksum) {
-    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) +
-      headerSize(verifyChecksum);
+    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
   }
 
   /**
@@ -651,9 +647,10 @@ public class HFileBlock implements Cacheable {
     ByteBuff dup = this.buf.duplicate();
     dup.position(this.headerSize());
     dup = dup.slice();
+
     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
-      unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
-      dup);
+      unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
+
     return unpacked;
   }
 
@@ -667,15 +664,14 @@ public class HFileBlock implements Cacheable {
     int headerSize = headerSize();
     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
 
-    // TODO we need consider allocating offheap here?
-    ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
+    ByteBuff newBuf = new SingleByteBuff(ByteBuffer.allocate(capacityNeeded));
 
     // Copy header bytes into newBuf.
     // newBuf is HBB so no issue in calling array()
     buf.position(0);
-    buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
+    newBuf.put(0, buf, 0, headerSize);
 
-    buf = new SingleByteBuff(newBuf);
+    buf = newBuf;
     // set limit to exclude next block's header
     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
   }
@@ -692,17 +688,6 @@ public class HFileBlock implements Cacheable {
     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
   }
 
-  /** An additional sanity-check in case no compression or encryption is being used. */
-  @VisibleForTesting
-  void sanityCheckUncompressedSize() throws IOException {
-    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
-      throw new IOException("Using no compression but "
-          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
-          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
-          + ", numChecksumbytes=" + totalChecksumBytes());
-    }
-  }
-
   /**
    * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
    * block is returned to the cache.
@@ -748,82 +733,6 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
-   * Read from an input stream at least <code>necessaryLen</code> and if possible,
-   * <code>extraLen</code> also if available. Analogous to
-   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
-   * number of "extra" bytes to also optionally read.
-   *
-   * @param in the input stream to read from
-   * @param buf the buffer to read into
-   * @param bufOffset the destination offset in the buffer
-   * @param necessaryLen the number of bytes that are absolutely necessary to read
-   * @param extraLen the number of extra bytes that would be nice to read
-   * @return true if succeeded reading the extra bytes
-   * @throws IOException if failed to read the necessary bytes
-   */
-  static boolean readWithExtra(InputStream in, byte[] buf,
-      int bufOffset, int necessaryLen, int extraLen) throws IOException {
-    int bytesRemaining = necessaryLen + extraLen;
-    while (bytesRemaining > 0) {
-      int ret = in.read(buf, bufOffset, bytesRemaining);
-      if (ret == -1 && bytesRemaining <= extraLen) {
-        // We could not read the "extra data", but that is OK.
-        break;
-      }
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream (read "
-            + "returned " + ret + ", was trying to read " + necessaryLen
-            + " necessary bytes and " + extraLen + " extra bytes, "
-            + "successfully read "
-            + (necessaryLen + extraLen - bytesRemaining));
-      }
-      bufOffset += ret;
-      bytesRemaining -= ret;
-    }
-    return bytesRemaining <= 0;
-  }
-
-  /**
-   * Read from an input stream at least <code>necessaryLen</code> and if possible,
-   * <code>extraLen</code> also if available. Analogous to
-   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
-   * positional read and specifies a number of "extra" bytes that would be
-   * desirable but not absolutely necessary to read.
-   *
-   * @param in the input stream to read from
-   * @param position the position within the stream from which to start reading
-   * @param buf the buffer to read into
-   * @param bufOffset the destination offset in the buffer
-   * @param necessaryLen the number of bytes that are absolutely necessary to
-   *     read
-   * @param extraLen the number of extra bytes that would be nice to read
-   * @return true if and only if extraLen is > 0 and reading those extra bytes
-   *     was successful
-   * @throws IOException if failed to read the necessary bytes
-   */
-  @VisibleForTesting
-  static boolean positionalReadWithExtra(FSDataInputStream in,
-      long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
-      throws IOException {
-    int bytesRemaining = necessaryLen + extraLen;
-    int bytesRead = 0;
-    while (bytesRead < necessaryLen) {
-      int ret = in.read(position, buf, bufOffset, bytesRemaining);
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream (positional read "
-            + "returned " + ret + ", was trying to read " + necessaryLen
-            + " necessary bytes and " + extraLen + " extra bytes, "
-            + "successfully read " + bytesRead);
-      }
-      position += ret;
-      bufOffset += ret;
-      bytesRemaining -= ret;
-      bytesRead += ret;
-    }
-    return bytesRead != necessaryLen && bytesRemaining <= 0;
-  }
-
-  /**
    * Unified version 2 {@link HFile} block writer. The intended usage pattern
    * is as follows:
    * <ol>
@@ -988,18 +897,6 @@ public class HFileBlock implements Cacheable {
     }
 
     /**
-     * Returns the stream for the user to write to. The block writer takes care
-     * of handling compression and buffering for caching on write. Can only be
-     * called in the "writing" state.
-     *
-     * @return the data output stream for the user to write to
-     */
-    DataOutputStream getUserDataStream() {
-      expectState(State.WRITING);
-      return userDataStream;
-    }
-
-    /**
      * Transitions the block writer from the "writing" state to the "block
      * ready" state.  Does nothing if a block is already finished.
      */
@@ -1261,11 +1158,9 @@ public class HFileBlock implements Cacheable {
     }
 
     /**
-     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is
-     * needed for storing packed blocks in the block cache. Expects calling semantics identical to
-     * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
-     * Does not include checksum data.
-     *
+     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
+     * for storing packed blocks in the block cache. Returns only the header and data, Does not
+     * include checksum data.
      * @return Returns a copy of block bytes for caching on write
      */
     private ByteBuffer cloneOnDiskBufferWithHeader() {
@@ -1321,11 +1216,10 @@ public class HFileBlock implements Cacheable {
                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
                                 .withIncludesTags(fileContext.isIncludesTags())
                                 .build();
-       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+      return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
           getUncompressedSizeWithoutHeader(), prevOffset,
-          cacheConf.shouldCacheCompressed(blockType.getCategory())?
-            cloneOnDiskBufferWithHeader() :
-            cloneUncompressedBufferWithHeader(),
+          cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
+              : cloneUncompressedBufferWithHeader(),
           FILL_HEADER, startOffset, UNSET,
           onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
     }
@@ -1415,8 +1309,8 @@ public class HFileBlock implements Cacheable {
    */
   private static class PrefetchedHeader {
     long offset = -1;
-    byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
-    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
+    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
 
     @Override
     public String toString() {
@@ -1479,11 +1373,11 @@ public class HFileBlock implements Cacheable {
     }
 
     /**
-     * A constructor that reads files with the latest minor version.
-     * This is used by unit tests only.
+     * A constructor that reads files with the latest minor version. This is used by unit tests
+     * only.
      */
     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
-    throws IOException {
+        throws IOException {
       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
     }
 
@@ -1520,60 +1414,49 @@ public class HFileBlock implements Cacheable {
     }
 
     /**
-     * Does a positional read or a seek and read into the given buffer. Returns
-     * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF.
-     *
+     * Does a positional read or a seek and read into the given byte buffer. We need take care that
+     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
+     * otherwise the memory leak may happen.
      * @param dest destination buffer
-     * @param destOffset offset into the destination buffer at where to put the bytes we read
      * @param size size of read
      * @param peekIntoNextBlock whether to read the next block's on-disk size
      * @param fileOffset position in the stream to read at
      * @param pread whether we should do a positional read
      * @param istream The input source of data
-     * @return the on-disk size of the next block with header size included, or
-     *         -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the
-     *         next header
-     * @throws IOException
+     * @return true to indicate the destination buffer include the next block header, otherwise only
+     *         include the current block data without the next block header.
+     * @throws IOException if any IO error happen.
      */
-    @VisibleForTesting
-    protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
-        boolean peekIntoNextBlock, long fileOffset, boolean pread)
-        throws IOException {
-      if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
-        // We are asked to read the next block's header as well, but there is
-        // not enough room in the array.
-        throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
-            " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
-      }
-
+    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
+        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
       if (!pread) {
         // Seek + read. Better for scanning.
         HFileUtil.seekOnMultipleSources(istream, fileOffset);
-        // TODO: do we need seek time latencies?
         long realOffset = istream.getPos();
         if (realOffset != fileOffset) {
-          throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
-              " bytes, but pos=" + realOffset + " after seek");
+          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
+              + " bytes, but pos=" + realOffset + " after seek");
         }
-
         if (!peekIntoNextBlock) {
-          IOUtils.readFully(istream, dest, destOffset, size);
-          return -1;
+          BlockIOUtils.readFully(dest, istream, size);
+          return false;
         }
 
-        // Try to read the next block header.
-        if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
-          return -1;
+        // Try to read the next block header
+        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
+          // did not read the next block header.
+          return false;
         }
       } else {
         // Positional read. Better for random reads; or when the streamLock is already locked.
         int extraSize = peekIntoNextBlock ? hdrSize : 0;
-        if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) {
-          return -1;
+        if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
+          // did not read the next block header.
+          return false;
         }
       }
       assert peekIntoNextBlock;
-      return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
+      return true;
     }
 
     /**
@@ -1672,7 +1555,7 @@ public class HFileBlock implements Cacheable {
      * is not right.
      * @throws IOException
      */
-    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf,
+    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
         final long offset, boolean verifyChecksum)
     throws IOException {
       // Assert size provided aligns with what is in the header
@@ -1691,11 +1574,11 @@ public class HFileBlock implements Cacheable {
      * we have to backup the stream because we over-read (the next block's header).
      * @see PrefetchedHeader
      * @return The cached block header or null if not found.
-     * @see #cacheNextBlockHeader(long, byte[], int, int)
+     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
      */
-    private ByteBuffer getCachedHeader(final long offset) {
+    private ByteBuff getCachedHeader(final long offset) {
       PrefetchedHeader ph = this.prefetchedHeader.get();
-      return ph != null && ph.offset == offset? ph.buf: null;
+      return ph != null && ph.offset == offset ? ph.buf : null;
     }
 
     /**
@@ -1704,13 +1587,24 @@ public class HFileBlock implements Cacheable {
      * @see PrefetchedHeader
      */
     private void cacheNextBlockHeader(final long offset,
-        final byte [] header, final int headerOffset, final int headerLength) {
+        ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
       PrefetchedHeader ph = new PrefetchedHeader();
       ph.offset = offset;
-      System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
+      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
       this.prefetchedHeader.set(ph);
     }
 
+    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
+        int onDiskSizeWithHeader) {
+      int nextBlockOnDiskSize = -1;
+      if (readNextHeader) {
+        nextBlockOnDiskSize =
+            onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
+                + hdrSize;
+      }
+      return nextBlockOnDiskSize;
+    }
+
     /**
      * Reads a version 2 block.
      *
@@ -1737,7 +1631,7 @@ public class HFileBlock implements Cacheable {
       // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
       // and will save us having to seek the stream backwards to reread the header we
       // read the last time through here.
-      ByteBuffer headerBuf = getCachedHeader(offset);
+      ByteBuff headerBuf = getCachedHeader(offset);
       LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
           "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
           verifyChecksum, headerBuf, onDiskSizeWithHeader);
@@ -1757,9 +1651,9 @@ public class HFileBlock implements Cacheable {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Extra see to get block size!", new RuntimeException());
           }
-          headerBuf = ByteBuffer.allocate(hdrSize);
-          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
-              offset, pread);
+          headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
+          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
+          headerBuf.rewind();
         }
         onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
       }
@@ -1770,46 +1664,55 @@ public class HFileBlock implements Cacheable {
       // says where to start reading. If we have the header cached, then we don't need to read
       // it again and we can likely read from last place we left off w/o need to backup and reread
       // the header we read last time through here.
-      // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
-      byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
-      int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
+      ByteBuff onDiskBlock =
+          new SingleByteBuff(ByteBuffer.allocate(onDiskSizeWithHeader + hdrSize));
+      boolean initHFileBlockSuccess = false;
+      try {
+        if (headerBuf != null) {
+          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
+        }
+        boolean readNextHeader = readAtOffset(is, onDiskBlock,
           onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
-      if (headerBuf != null) {
-        // The header has been read when reading the previous block OR in a distinct header-only
-        // read. Copy to this block's header.
-        System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
-      } else {
-        headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
-      }
-      // Do a few checks before we go instantiate HFileBlock.
-      assert onDiskSizeWithHeader > this.hdrSize;
-      verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
-      ByteBuff onDiskBlockByteBuff =
-          new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader));
-      // Verify checksum of the data before using it for building HFileBlock.
-      if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) {
-        return null;
-      }
-      long duration = System.currentTimeMillis() - startTime;
-      if (updateMetrics) {
-        HFile.updateReadLatency(duration, pread);
-      }
-      // The onDiskBlock will become the headerAndDataBuffer for this block.
-      // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
-      // contains the header of next block, so no need to set next block's header in it.
-      HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport,
-          MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
-      // Run check on uncompressed sizings.
-      if (!fileContext.isCompressedOrEncrypted()) {
-        hFileBlock.sanityCheckUncompressed();
-      }
-      LOG.trace("Read {} in {} ns", hFileBlock, duration);
-      // Cache next block header if we read it for the next time through here.
-      if (nextBlockOnDiskSize != -1) {
-        cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
-            onDiskBlock, onDiskSizeWithHeader, hdrSize);
+        onDiskBlock.rewind(); // in case of moving position when copying a cached header
+        int nextBlockOnDiskSize =
+            getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
+        if (headerBuf == null) {
+          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
+        }
+        // Do a few checks before we go instantiate HFileBlock.
+        assert onDiskSizeWithHeader > this.hdrSize;
+        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
+        ByteBuff curBlock = onDiskBlock.duplicate().limit(onDiskSizeWithHeader);
+        // Verify checksum of the data before using it for building HFileBlock.
+        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
+          return null;
+        }
+        long duration = System.currentTimeMillis() - startTime;
+        if (updateMetrics) {
+          HFile.updateReadLatency(duration, pread);
+        }
+        // The onDiskBlock will become the headerAndDataBuffer for this block.
+        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
+        // contains the header of next block, so no need to set next block's header in it.
+        HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
+            offset, nextBlockOnDiskSize, fileContext);
+        // Run check on uncompressed sizings.
+        if (!fileContext.isCompressedOrEncrypted()) {
+          hFileBlock.sanityCheckUncompressed();
+        }
+        LOG.trace("Read {} in {} ns", hFileBlock, duration);
+        // Cache next block header if we read it for the next time through here.
+        if (nextBlockOnDiskSize != -1) {
+          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
+            onDiskSizeWithHeader, hdrSize);
+        }
+        initHFileBlockSuccess = true;
+        return hFileBlock;
+      } finally {
+        if (!initHFileBlockSuccess) {
+          onDiskBlock.release();
+        }
       }
-      return hFileBlock;
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
similarity index 54%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index a13c868..60180e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -17,33 +17,115 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
-/**
- * Unit test suite covering HFileBlock positional read logic.
- */
-@Category({IOTests.class, SmallTests.class})
-public class TestHFileBlockPositionalRead {
+@Category({ IOTests.class, SmallTests.class })
+public class TestBlockIOUtils {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestHFileBlockPositionalRead.class);
+      HBaseClassTestRule.forClass(TestBlockIOUtils.class);
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Test
+  public void testIsByteBufferReadable() throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable");
+    try (FSDataOutputStream out = fs.create(p)) {
+      out.writeInt(23);
+    }
+    try (FSDataInputStream is = fs.open(p)) {
+      assertFalse(BlockIOUtils.isByteBufferReadable(is));
+    }
+  }
+
+  @Test
+  public void testReadFully() throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
+    String s = "hello world";
+    try (FSDataOutputStream out = fs.create(p)) {
+      out.writeBytes(s);
+    }
+    ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
+    try (FSDataInputStream in = fs.open(p)) {
+      BlockIOUtils.readFully(buf, in, 11);
+    }
+    buf.rewind();
+    byte[] heapBuf = new byte[s.length()];
+    buf.get(heapBuf, 0, heapBuf.length);
+    assertArrayEquals(Bytes.toBytes(s), heapBuf);
+  }
+
+  @Test
+  public void testReadWithExtra() throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra");
+    String s = "hello world";
+    try (FSDataOutputStream out = fs.create(p)) {
+      out.writeBytes(s);
+    }
+    ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
+    try (FSDataInputStream in = fs.open(p)) {
+      assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
+    }
+    buf.rewind();
+    byte[] heapBuf = new byte[buf.capacity()];
+    buf.get(heapBuf, 0, heapBuf.length);
+    assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
+
+    buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
+    try (FSDataInputStream in = fs.open(p)) {
+      assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
+    }
+    buf.rewind();
+    heapBuf = new byte[11];
+    buf.get(heapBuf, 0, heapBuf.length);
+    assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
+
+    buf.position(0).limit(12);
+    try (FSDataInputStream in = fs.open(p)) {
+      try {
+        BlockIOUtils.readWithExtra(buf, in, 12, 0);
+        fail("Should only read 11 bytes");
+      } catch (IOException e) {
+
+      }
+    }
+  }
+
   @Test
   public void testPositionalReadNoExtra() throws IOException {
     long position = 0;
@@ -52,10 +134,10 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 0;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
-    boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
-        bufOffset, necessaryLen, extraLen);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when no extra bytes requested", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verifyNoMoreInteractions(in);
@@ -69,11 +151,11 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 0;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
     when(in.read(5, buf, 5, 5)).thenReturn(5);
-    boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
-        bufOffset, necessaryLen, extraLen);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when no extra bytes requested", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verify(in).read(5, buf, 5, 5);
@@ -88,10 +170,10 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 5;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
-    boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
-        bufOffset, necessaryLen, extraLen);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertTrue("Expect true return when reading extra bytes succeeds", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verifyNoMoreInteractions(in);
@@ -105,10 +187,10 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 5;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
-    boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
-        bufOffset, necessaryLen, extraLen);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when reading extra bytes fails", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verifyNoMoreInteractions(in);
@@ -123,11 +205,11 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 5;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
     when(in.read(5, buf, 5, 10)).thenReturn(10);
-    boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
-        bufOffset, necessaryLen, extraLen);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertTrue("Expect true return when reading extra bytes succeeds", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verify(in).read(5, buf, 5, 10);
@@ -142,12 +224,12 @@ public class TestHFileBlockPositionalRead {
     int extraLen = 0;
     int totalLen = necessaryLen + extraLen;
     byte[] buf = new byte[totalLen];
+    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
     exception.expect(IOException.class);
     exception.expectMessage("EOF");
-    HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset,
-        necessaryLen, extraLen);
+    BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index e93b61e..a4135d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -398,23 +398,25 @@ public class TestChecksum {
       return b;
     }
 
+
     @Override
-    protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
+    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
-      int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock,
-          fileOffset, pread);
+      int destOffset = dest.position();
+      boolean returnValue =
+          super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
       if (!corruptDataStream) {
         return returnValue;
       }
       // Corrupt 3rd character of block magic of next block's header.
       if (peekIntoNextBlock) {
-        dest[destOffset + size + 3] = 0b00000000;
+        dest.put(destOffset + size + 3, (byte) 0b00000000);
       }
       // We might be reading this block's header too, corrupt it.
-      dest[destOffset + 1] = 0b00000000;
+      dest.put(destOffset + 1, (byte) 0b00000000);
       // Corrupt non header data
       if (size > hdrSize) {
-        dest[destOffset + hdrSize + 1] = 0b00000000;
+        dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
       }
       return returnValue;
     }