You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/24 02:31:16 UTC

[hbase] 20/23: HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping (#268)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d64c3a2611a0845f7c659204d039a32b0d6b20df
Author: openinx <op...@gmail.com>
AuthorDate: Thu Jun 13 14:34:34 2019 +0800

    HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping (#268)
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |   6 +
 .../hbase/io/hfile/ExclusiveMemHFileBlock.java     |  69 ++++
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 166 ++++----
 .../hadoop/hbase/io/hfile/HFileBlockBuilder.java   | 114 ++++++
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |   6 +-
 .../hadoop/hbase/io/hfile/LruBlockCache.java       |   4 +-
 .../hadoop/hbase/io/hfile/SharedMemHFileBlock.java |  46 +++
 .../hadoop/hbase/io/hfile/TinyLfuBlockCache.java   |   4 +-
 .../org/apache/hadoop/hbase/io/TestHeapSize.java   |  10 +
 .../hadoop/hbase/io/hfile/CacheTestUtils.java      |  10 +-
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |   4 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    |   6 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      |  49 ++-
 .../hbase/io/hfile/TestHFileDataBlockEncoder.java  |  16 +-
 .../hfile/TestHFileScannerImplReferenceCount.java  | 450 +++++++++++++--------
 .../hadoop/hbase/io/hfile/TestLruBlockCache.java   |  12 +-
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  29 +-
 .../io/hfile/bucket/TestBucketCacheRefCnt.java     |   3 +-
 .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java |   7 +-
 19 files changed, 706 insertions(+), 305 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index c85675b..e8e77dc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -297,6 +297,12 @@ public class ByteBuffAllocator {
         }
       }
     }
+    this.usedBufCount.set(0);
+    this.maxPoolSizeInfoLevelLogged = false;
+    this.poolAllocationBytes.reset();
+    this.heapAllocationBytes.reset();
+    this.lastPoolAllocationBytes = 0;
+    this.lastHeapAllocationBytes = 0;
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ExclusiveMemHFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ExclusiveMemHFileBlock.java
new file mode 100644
index 0000000..73c0db4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ExclusiveMemHFileBlock.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
+ * if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
+ * say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
+ * would must be an off-heap block.
+ * <p>
+ * The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
+ * its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
+ * do nothing for the de-allocating.
+ * <p>
+ * @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
+ */
+@InterfaceAudience.Private
+public class ExclusiveMemHFileBlock extends HFileBlock {
+
+  ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
+      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
+      HFileContext fileContext) {
+    super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
+        fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
+        ByteBuffAllocator.HEAP);
+  }
+
+  @Override
+  public int refCnt() {
+    return 0;
+  }
+
+  @Override
+  public ExclusiveMemHFileBlock retain() {
+    // do nothing
+    return this;
+  }
+
+  @Override
+  public boolean release() {
+    // do nothing
+    return false;
+  }
+
+  @Override
+  public boolean isSharedMem() {
+    return false;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 452b68c..14ed275 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -285,7 +285,7 @@ public class HFileBlock implements Cacheable {
       boolean usesChecksum = buf.get() == (byte) 1;
       long offset = buf.getLong();
       int nextBlockOnDiskSize = buf.getInt();
-      return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
+      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
     }
 
     @Override
@@ -301,28 +301,6 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
-   * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
-   */
-  private HFileBlock(HFileBlock that) {
-    this(that, false);
-  }
-
-  /**
-   * Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
-   * param.
-   */
-  private HFileBlock(HFileBlock that, boolean bufCopy) {
-    init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
-      that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
-      that.fileContext, that.allocator);
-    if (bufCopy) {
-      this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
-    } else {
-      this.buf = that.buf.duplicate();
-    }
-  }
-
-  /**
    * Creates a new {@link HFile} block from the given fields. This constructor
    * is used only while writing blocks and caching,
    * and is sitting in a byte buffer and we want to stuff the block into cache.
@@ -336,7 +314,7 @@ public class HFileBlock implements Cacheable {
    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
    * @param prevBlockOffset see {@link #prevBlockOffset}
-   * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
+   * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
    * @param fillHeader when true, write the first 4 header fields into passed buffer.
    * @param offset the file offset the block was read from
    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
@@ -344,12 +322,19 @@ public class HFileBlock implements Cacheable {
    */
   @VisibleForTesting
   public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
-      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
-      long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
-      HFileContext fileContext, ByteBuffAllocator allocator) {
-    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
-      onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
-    this.buf = new SingleByteBuff(b);
+      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
+      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
+      ByteBuffAllocator allocator) {
+    this.blockType = blockType;
+    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
+    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
+    this.prevBlockOffset = prevBlockOffset;
+    this.offset = offset;
+    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
+    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
+    this.fileContext = fileContext;
+    this.allocator = allocator;
+    this.buf = buf;
     if (fillHeader) {
       overwriteHeader();
     }
@@ -363,7 +348,7 @@ public class HFileBlock implements Cacheable {
    * to that point.
    * @param buf Has header, content, and trailing checksums if present.
    */
-  HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
+  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
       final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
       throws IOException {
     buf.rewind();
@@ -374,15 +359,15 @@ public class HFileBlock implements Cacheable {
     final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
     // This constructor is called when we deserialize a block from cache and when we read a block in
     // from the fs. fileCache is null when deserialized from cache so need to make up one.
-    HFileContextBuilder fileContextBuilder = fileContext != null?
-        new HFileContextBuilder(fileContext): new HFileContextBuilder();
+    HFileContextBuilder fileContextBuilder =
+        fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
     fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
     int onDiskDataSizeWithHeader;
     if (usesHBaseChecksum) {
       byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
       int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
       onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
-      // Use the checksum type and bytes per checksum from header, not from filecontext.
+      // Use the checksum type and bytes per checksum from header, not from fileContext.
       fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
       fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
     } else {
@@ -393,29 +378,19 @@ public class HFileBlock implements Cacheable {
     }
     fileContext = fileContextBuilder.build();
     assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
-    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
-      onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
-    this.offset = offset;
-    this.buf = buf;
-    this.buf.rewind();
-  }
-
-  /**
-   * Called from constructors.
-   */
-  private void init(BlockType blockType, int onDiskSizeWithoutHeader,
-      int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
-      int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
-      ByteBuffAllocator allocator) {
-    this.blockType = blockType;
-    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
-    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
-    this.prevBlockOffset = prevBlockOffset;
-    this.offset = offset;
-    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
-    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
-    this.fileContext = fileContext;
-    this.allocator = allocator;
+    return new HFileBlockBuilder()
+        .withBlockType(blockType)
+        .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
+        .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
+        .withPrevBlockOffset(prevBlockOffset)
+        .withOffset(offset)
+        .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
+        .withNextBlockOnDiskSize(nextBlockOnDiskSize)
+        .withHFileContext(fileContext)
+        .withByteBuffAllocator(allocator)
+        .withByteBuff(buf.rewind())
+        .withShared(!buf.hasArray())
+        .build();
   }
 
   /**
@@ -639,7 +614,7 @@ public class HFileBlock implements Cacheable {
         .append("(").append(onDiskSizeWithoutHeader)
         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
     }
-    String dataBegin = null;
+    String dataBegin;
     if (buf.hasArray()) {
       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
@@ -673,7 +648,7 @@ public class HFileBlock implements Cacheable {
       return this;
     }
 
-    HFileBlock unpacked = new HFileBlock(this);
+    HFileBlock unpacked = shallowClone(this);
     unpacked.allocateBuffer(); // allocates space for the decompressed block
     boolean succ = false;
     try {
@@ -761,10 +736,16 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
-   * @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
+   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
+   * by default.
    */
-  boolean isOnHeap() {
-    return buf.hasArray();
+  public boolean isSharedMem() {
+    if (this instanceof SharedMemHFileBlock) {
+      return true;
+    } else if (this instanceof ExclusiveMemHFileBlock) {
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -1039,8 +1020,7 @@ public class HFileBlock implements Cacheable {
             + offset);
       }
       startOffset = offset;
-
-      finishBlockAndWriteHeaderAndData((DataOutputStream) out);
+      finishBlockAndWriteHeaderAndData(out);
     }
 
     /**
@@ -1251,13 +1231,27 @@ public class HFileBlock implements Cacheable {
                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
                                 .withIncludesTags(fileContext.isIncludesTags())
                                 .build();
-      return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
-          getUncompressedSizeWithoutHeader(), prevOffset,
-          cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
-              : cloneUncompressedBufferWithHeader(),
-          FILL_HEADER, startOffset, UNSET,
-          onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
-          cacheConf.getByteBuffAllocator());
+      // Build the HFileBlock.
+      HFileBlockBuilder builder = new HFileBlockBuilder();
+      ByteBuffer buffer;
+      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
+        buffer = cloneOnDiskBufferWithHeader();
+      } else {
+        buffer = cloneUncompressedBufferWithHeader();
+      }
+      return builder.withBlockType(blockType)
+          .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
+          .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
+          .withPrevBlockOffset(prevOffset)
+          .withByteBuff(ByteBuff.wrap(buffer))
+          .withFillHeader(FILL_HEADER)
+          .withOffset(startOffset)
+          .withNextBlockOnDiskSize(UNSET)
+          .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
+          .withHFileContext(newContext)
+          .withByteBuffAllocator(cacheConf.getByteBuffAllocator())
+          .withShared(!buffer.hasArray())
+          .build();
     }
   }
 
@@ -1781,8 +1775,8 @@ public class HFileBlock implements Cacheable {
         // The onDiskBlock will become the headerAndDataBuffer for this block.
         // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
         // contains the header of next block, so no need to set next block's header in it.
-        HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
-            nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
+        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
+          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
         // Run check on uncompressed sizings.
         if (!fileContext.isCompressedOrEncrypted()) {
           hFileBlock.sanityCheckUncompressed();
@@ -1947,7 +1941,7 @@ public class HFileBlock implements Cacheable {
     if (comparison == null) {
       return false;
     }
-    if (comparison.getClass() != this.getClass()) {
+    if (!(comparison instanceof HFileBlock)) {
       return false;
     }
 
@@ -2084,7 +2078,27 @@ public class HFileBlock implements Cacheable {
                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
   }
 
-  public HFileBlock deepCloneOnHeap() {
-    return new HFileBlock(this, true);
+  private static HFileBlockBuilder createBuilder(HFileBlock blk){
+    return new HFileBlockBuilder()
+          .withBlockType(blk.blockType)
+          .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
+          .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
+          .withPrevBlockOffset(blk.prevBlockOffset)
+          .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
+          .withOffset(blk.offset)
+          .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
+          .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
+          .withHFileContext(blk.fileContext)
+          .withByteBuffAllocator(blk.allocator)
+          .withShared(blk.isSharedMem());
+  }
+
+  static HFileBlock shallowClone(HFileBlock blk) {
+    return createBuilder(blk).build();
+  }
+
+  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
+    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
+    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockBuilder.java
new file mode 100644
index 0000000..4ed50e1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockBuilder.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static javax.swing.Spring.UNSET;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class HFileBlockBuilder {
+
+  private BlockType blockType;
+  private int onDiskSizeWithoutHeader;
+  private int onDiskDataSizeWithHeader;
+  private int uncompressedSizeWithoutHeader;
+  private long prevBlockOffset;
+  private ByteBuff buf;
+  private boolean fillHeader = false;
+  private long offset = UNSET;
+  private int nextBlockOnDiskSize = UNSET;
+  private HFileContext fileContext;
+  private ByteBuffAllocator allocator = HEAP;
+  private boolean isShared;
+
+  public HFileBlockBuilder withBlockType(BlockType blockType) {
+    this.blockType = blockType;
+    return this;
+  }
+
+  public HFileBlockBuilder withOnDiskSizeWithoutHeader(int onDiskSizeWithoutHeader) {
+    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
+    return this;
+  }
+
+  public HFileBlockBuilder withOnDiskDataSizeWithHeader(int onDiskDataSizeWithHeader) {
+    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
+    return this;
+  }
+
+  public HFileBlockBuilder withUncompressedSizeWithoutHeader(int uncompressedSizeWithoutHeader) {
+    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
+    return this;
+  }
+
+  public HFileBlockBuilder withPrevBlockOffset(long prevBlockOffset) {
+    this.prevBlockOffset = prevBlockOffset;
+    return this;
+  }
+
+  public HFileBlockBuilder withByteBuff(ByteBuff buf) {
+    this.buf = buf;
+    return this;
+  }
+
+  public HFileBlockBuilder withFillHeader(boolean fillHeader) {
+    this.fillHeader = fillHeader;
+    return this;
+  }
+
+  public HFileBlockBuilder withOffset(long offset) {
+    this.offset = offset;
+    return this;
+  }
+
+  public HFileBlockBuilder withNextBlockOnDiskSize(int nextBlockOnDiskSize) {
+    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
+    return this;
+  }
+
+  public HFileBlockBuilder withHFileContext(HFileContext fileContext) {
+    this.fileContext = fileContext;
+    return this;
+  }
+
+  public HFileBlockBuilder withByteBuffAllocator(ByteBuffAllocator allocator) {
+    this.allocator = allocator;
+    return this;
+  }
+
+  public HFileBlockBuilder withShared(boolean isShared) {
+    this.isShared = isShared;
+    return this;
+  }
+
+  public HFileBlock build() {
+    if (isShared) {
+      return new SharedMemHFileBlock(blockType, onDiskSizeWithoutHeader,
+          uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
+          nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, allocator);
+    } else {
+      return new ExclusiveMemHFileBlock(blockType, onDiskSizeWithoutHeader,
+          uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
+          nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 1157615..0dae13c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -523,15 +523,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
         return;
       }
-      if (this.curBlock != null) {
+      if (this.curBlock != null && this.curBlock.isSharedMem()) {
         prevBlocks.add(this.curBlock);
       }
       this.curBlock = block;
     }
 
     void reset() {
-      // We don't have to keep ref to EXCLUSIVE type of block
-      if (this.curBlock != null) {
+      // We don't have to keep ref to heap block
+      if (this.curBlock != null && this.curBlock.isSharedMem()) {
         this.prevBlocks.add(this.curBlock);
       }
       this.curBlock = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 0ec73a3..7740460 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -372,8 +372,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
   private Cacheable asReferencedHeapBlock(Cacheable buf) {
     if (buf instanceof HFileBlock) {
       HFileBlock blk = ((HFileBlock) buf);
-      if (!blk.isOnHeap()) {
-        return blk.deepCloneOnHeap();
+      if (blk.isSharedMem()) {
+        return HFileBlock.deepCloneOnHeap(blk);
       }
     }
     // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SharedMemHFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SharedMemHFileBlock.java
new file mode 100644
index 0000000..0d2217e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/SharedMemHFileBlock.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
+ * if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
+ * say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
+ * would must be an off-heap block.
+ * @see org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock
+ **/
+@InterfaceAudience.Private
+public class SharedMemHFileBlock extends HFileBlock {
+
+  SharedMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
+      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
+      ByteBuffAllocator alloc) {
+    super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
+        fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, alloc);
+  }
+
+  @Override
+  public boolean isSharedMem() {
+    return true;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
index 5e69f6c..a90c5a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
@@ -171,8 +171,8 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
       if (victimCache != null) {
         value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
         if ((value != null) && caching) {
-          if ((value instanceof HFileBlock) && !((HFileBlock) value).isOnHeap()) {
-            value = ((HFileBlock) value).deepCloneOnHeap();
+          if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
+            value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
           }
           cacheBlock(cacheKey, value);
         }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 71ffb87..108de70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
+import org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock;
 import org.apache.hadoop.hbase.regionserver.CSLMImmutableSegment;
 import org.apache.hadoop.hbase.regionserver.CellArrayImmutableSegment;
 import org.apache.hadoop.hbase.regionserver.CellArrayMap;
@@ -529,6 +531,14 @@ public class TestHeapSize  {
     actual = HFileBlock.FIXED_OVERHEAD;
     expected = ClassSize.estimateBase(HFileBlock.class, false);
     assertEquals(expected, actual);
+
+    actual = ExclusiveMemHFileBlock.FIXED_OVERHEAD;
+    expected = ClassSize.estimateBase(ExclusiveMemHFileBlock.class, false);
+    assertEquals(expected, actual);
+
+    actual = SharedMemHFileBlock.FIXED_OVERHEAD;
+    expected = ClassSize.estimateBase(SharedMemHFileBlock.class, false);
+    assertEquals(expected, actual);
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 717e9d7..a7bb8e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -148,7 +148,6 @@ public class CacheTestUtils {
       if (buf != null) {
         assertEquals(block.block, buf);
       }
-
     }
 
     // Re-add some duplicate blocks. Hope nothing breaks.
@@ -307,10 +306,11 @@ public class CacheTestUtils {
                           .withBytesPerCheckSum(0)
                           .withChecksumType(ChecksumType.NULL)
                           .build();
-      HFileBlock generated = new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader,
-          uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
-          blockSize, onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
-          ByteBuffAllocator.HEAP);
+      HFileBlock generated =
+          new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
+              prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
+              onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
+              ByteBuffAllocator.HEAP);
 
       String strKey;
       /* No conflicting keys */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index 2aebc8c..6d02854 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -101,7 +101,7 @@ public class TestChecksum {
     HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
         meta, ByteBuffAllocator.HEAP);
     HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
-    assertTrue(b.isOnHeap());
+    assertTrue(!b.isSharedMem());
     assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
   }
 
@@ -148,7 +148,7 @@ public class TestChecksum {
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
           meta, ByteBuffAllocator.HEAP);
       HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
-      assertTrue(b.isOnHeap());
+      assertTrue(!b.isSharedMem());
 
       // verify SingleByteBuff checksum.
       verifySBBCheckSum(b.getBufferReadOnly());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 84e24e6..a504442 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -169,7 +169,7 @@ public class TestHFile  {
       Cacheable cachedBlock = lru.getBlock(key, false, false, true);
       Assert.assertNotNull(cachedBlock);
       Assert.assertTrue(cachedBlock instanceof HFileBlock);
-      Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
+      Assert.assertFalse(((HFileBlock) cachedBlock).isSharedMem());
       // Should never allocate off-heap block from allocator because ensure that it's LRU.
       Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
       block.release(); // return back the ByteBuffer back to allocator.
@@ -217,10 +217,10 @@ public class TestHFile  {
         HFileBlock hfb = (HFileBlock) cachedBlock;
         // Data block will be cached in BucketCache, so it should be an off-heap block.
         if (hfb.getBlockType().isData()) {
-          Assert.assertFalse(hfb.isOnHeap());
+          Assert.assertTrue(hfb.isSharedMem());
         } else {
           // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
-          Assert.assertTrue(hfb.isOnHeap());
+          Assert.assertFalse(hfb.isSharedMem());
         }
       } finally {
         cachedBlock.release();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index de10ced..d0e98fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -340,6 +340,14 @@ public class TestHFileBlock {
     testReaderV2Internals();
   }
 
+  private void assertRelease(HFileBlock blk) {
+    if (blk instanceof ExclusiveMemHFileBlock) {
+      assertFalse(blk.release());
+    } else {
+      assertTrue(blk.release());
+    }
+  }
+
   protected void testReaderV2Internals() throws IOException {
     if(includesTag) {
       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
@@ -403,10 +411,10 @@ public class TestHFileBlock {
                 + "'.\nMessage is expected to start with: '" + expectedPrefix
                 + "'", ex.getMessage().startsWith(expectedPrefix));
           }
-          assertTrue(b.release());
+          assertRelease(b);
           is.close();
         }
-        assertTrue(expected.release());
+        assertRelease(expected);
       }
     }
   }
@@ -534,7 +542,7 @@ public class TestHFileBlock {
                   deserialized.unpack(meta, hbr));
               }
             }
-            assertTrue(blockUnpacked.release());
+            assertRelease(blockUnpacked);
             if (blockFromHFile != blockUnpacked) {
               blockFromHFile.release();
             }
@@ -651,7 +659,7 @@ public class TestHFileBlock {
             assertEquals(b.getOnDiskDataSizeWithHeader(),
                          b2.getOnDiskDataSizeWithHeader());
             assertEquals(0, HFile.getAndResetChecksumFailuresCount());
-            assertTrue(b2.release());
+            assertRelease(b2);
 
             curOffset += b.getOnDiskSizeWithHeader();
 
@@ -694,12 +702,12 @@ public class TestHFileBlock {
                 }
               }
               assertTrue(wrongBytesMsg, bytesAreCorrect);
-              assertTrue(newBlock.release());
+              assertRelease(newBlock);
               if (newBlock != b) {
-                assertTrue(b.release());
+                assertRelease(b);
               }
             } else {
-              assertTrue(b.release());
+              assertRelease(b);
             }
           }
           assertEquals(curOffset, fs.getFileStatus(path).getLen());
@@ -750,9 +758,9 @@ public class TestHFileBlock {
           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
           b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
           if (useHeapAllocator) {
-            assertTrue(b.isOnHeap());
+            assertTrue(!b.isSharedMem());
           } else {
-            assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
+            assertTrue(!b.getBlockType().isData() || b.isSharedMem());
           }
           assertEquals(types.get(blockId), b.getBlockType());
           assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
@@ -913,14 +921,13 @@ public class TestHFileBlock {
                           .withCompression(Algorithm.NONE)
                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
                           .withChecksumType(ChecksumType.NULL).build();
-      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-          -1, 0, -1, meta, HEAP);
-      long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
-          new MultiByteBuff(buf).getClass(), true)
-          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
-      long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
-      long hfileBlockExpectedSize =
-          ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
+      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+          HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
+      long byteBufferExpectedSize =
+          ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
+              + HConstants.HFILEBLOCK_HEADER_SIZE + size);
+      long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
+      long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
       long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
       assertEquals("Block data size: " + size + ", byte buffer expected " +
           "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
@@ -936,10 +943,10 @@ public class TestHFileBlock {
     byte[] byteArr = new byte[length];
     ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
     HFileContext meta = new HFileContextBuilder().build();
-    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
-    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
+    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
+    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
     ByteBuffer buff1 = ByteBuffer.allocate(length);
     ByteBuffer buff2 = ByteBuffer.allocate(length);
     blockWithNextBlockMetadata.serialize(buff1, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
index 5a6042c..2f249c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -132,8 +133,8 @@ public class TestHFileDataBlockEncoder {
                         .withBlockSize(0)
                         .withChecksumType(ChecksumType.NULL)
                         .build();
-    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-        0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
+    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+        HFileBlock.FILL_HEADER, 0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
     HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
     assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
   }
@@ -198,9 +199,8 @@ public class TestHFileDataBlockEncoder {
                         .withBlockSize(0)
                         .withChecksumType(ChecksumType.NULL)
                         .build();
-    HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, 0,
-         0, -1, meta, ByteBuffAllocator.HEAP);
+    HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+        HFileBlock.FILL_HEADER, 0, 0, -1, meta, ByteBuffAllocator.HEAP);
     return b;
   }
 
@@ -221,9 +221,9 @@ public class TestHFileDataBlockEncoder {
     blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA);
     byte[] encodedBytes = baos.toByteArray();
     size = encodedBytes.length - block.getDummyHeaderForVersion().length;
-    return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
-        HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
-        block.getHFileContext(), ByteBuffAllocator.HEAP);
+    return new HFileBlock(context.getBlockType(), size, size, -1,
+        ByteBuff.wrap(ByteBuffer.wrap(encodedBytes)), HFileBlock.FILL_HEADER, 0,
+        block.getOnDiskDataSizeWithHeader(), -1, block.getHFileContext(), ByteBuffAllocator.HEAP);
   }
 
   private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
index 60ee958..87dd29e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
@@ -18,6 +18,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
 import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
 import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
 import static org.junit.Assert.assertEquals;
@@ -34,17 +39,24 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,30 +67,74 @@ public class TestHFileScannerImplReferenceCount {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
 
+  @Rule
+  public TestName CASE = new TestName();
+
   private static final Logger LOG =
       LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
   private static final byte[] FAMILY = Bytes.toBytes("f");
   private static final byte[] QUALIFIER = Bytes.toBytes("q");
   private static final byte[] SUFFIX = randLongBytes();
+  private static final int CELL_COUNT = 1000;
 
   private static byte[] randLongBytes() {
     Random rand = new Random();
-    byte[] keys = new byte[300];
+    byte[] keys = new byte[30];
     rand.nextBytes(keys);
     return keys;
   }
 
+  // It's a deep copy of configuration of UTIL, DON'T use shallow copy.
+  private Configuration conf;
+  private Path workDir;
+  private FileSystem fs;
+  private Path hfilePath;
   private Cell firstCell = null;
   private Cell secondCell = null;
+  private ByteBuffAllocator allocator;
 
   @BeforeClass
-  public static void setUp() {
+  public static void setUpBeforeClass() {
     Configuration conf = UTIL.getConfiguration();
     // Set the max chunk size and min entries key to be very small for index block, so that we can
     // create an index block tree with level >= 2.
     conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
     conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
+    // Create a bucket cache with 32MB.
+    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
+    conf.setInt(BUFFER_SIZE_KEY, 1024);
+    conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
+    // All allocated ByteBuff are pooled ByteBuff.
+    conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    this.firstCell = null;
+    this.secondCell = null;
+    this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
+    this.conf = new Configuration(UTIL.getConfiguration());
+    String caseName = CASE.getMethodName();
+    this.workDir = UTIL.getDataTestDir(caseName);
+    this.fs = this.workDir.getFileSystem(conf);
+    this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
+    LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    this.allocator.clean();
+    this.fs.delete(this.workDir, true);
+  }
+
+  private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
+    Assert.assertTrue(cache instanceof CombinedBlockCache);
+    BlockCache[] blockCaches = cache.getBlockCaches();
+    Assert.assertEquals(blockCaches.length, 2);
+    Assert.assertTrue(blockCaches[1] instanceof BucketCache);
+    TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
   }
 
   private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
@@ -107,176 +163,192 @@ public class TestHFileScannerImplReferenceCount {
     }
   }
 
+  /**
+   * A careful UT for validating the reference count mechanism, if want to change this UT please
+   * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
+   */
   private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
       throws Exception {
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    Path dir = UTIL.getDataTestDir("testReleasingBlock");
-    FileSystem fs = dir.getFileSystem(conf);
-    try {
-      String hfileName = "testReleaseBlock_hfile_0_" + System.currentTimeMillis();
-      Path hfilePath = new Path(dir, hfileName);
-      int cellCount = 1000;
-      LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
-      writeHFile(conf, fs, hfilePath, compression, encoding, cellCount);
-
-      BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
-      Assert.assertNotNull(defaultBC);
-      HFile.Reader reader =
-          HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
-      Assert.assertTrue(reader instanceof HFileReaderImpl);
-      // We've build a HFile tree with index = 16.
-      Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
-
-      HFileScanner scanner = reader.getScanner(true, true, false);
-      BlockWithScanInfo scanInfo = reader.getDataBlockIndexReader()
-          .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE);
-      BlockWithScanInfo scanInfo2 = reader.getDataBlockIndexReader()
-          .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE);
-      HFileBlock block = scanInfo.getHFileBlock();
-      HFileBlock block2 = scanInfo2.getHFileBlock();
-      // One refCnt for blockCache and the other refCnt for RPC path.
-      Assert.assertEquals(block.refCnt(), 2);
-      Assert.assertEquals(block2.refCnt(), 2);
-      Assert.assertFalse(block == block2);
-
-      scanner.seekTo(firstCell);
-      Assert.assertEquals(block.refCnt(), 3);
-
-      // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
-      // refCnt should be unchanged.
-      scanner.seekTo(firstCell);
-      Assert.assertEquals(block.refCnt(), 3);
-
-      scanner.seekTo(secondCell);
-      Assert.assertEquals(block.refCnt(), 3);
-      Assert.assertEquals(block2.refCnt(), 3);
-
-      // After shipped, the block will be release, but block2 is still referenced by the curBlock.
-      scanner.shipped();
-      Assert.assertEquals(block.refCnt(), 2);
-      Assert.assertEquals(block2.refCnt(), 3);
-
-      // Try to ship again, though with nothing to client.
-      scanner.shipped();
-      Assert.assertEquals(block.refCnt(), 2);
-      Assert.assertEquals(block2.refCnt(), 3);
-
-      // The curBlock(block2) will also be released.
-      scanner.close();
-      Assert.assertEquals(block2.refCnt(), 2);
-
-      // Finish the block & block2 RPC path
-      block.release();
-      block2.release();
-      Assert.assertEquals(block.refCnt(), 1);
-      Assert.assertEquals(block2.refCnt(), 1);
-
-      // Evict the LRUBlockCache
-      Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
-      Assert.assertEquals(block.refCnt(), 0);
-      Assert.assertEquals(block2.refCnt(), 0);
-
-      int count = 0;
-      Assert.assertTrue(scanner.seekTo());
-      ++count;
-      while (scanner.next()) {
-        count++;
-      }
-      assertEquals(cellCount, count);
-    } finally {
-      fs.delete(dir, true);
+    writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
+    HFileBlock curBlock, prevBlock;
+    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
+    Assert.assertNotNull(defaultBC);
+    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
+    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
+    Assert.assertTrue(reader instanceof HFileReaderImpl);
+    // We've build a HFile tree with index = 16.
+    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
+
+    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
+    HFileBlock block1 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    waitBucketCacheFlushed(defaultBC);
+    Assert.assertTrue(block1.getBlockType().isData());
+    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
+
+    HFileBlock block2 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    waitBucketCacheFlushed(defaultBC);
+    Assert.assertTrue(block2.getBlockType().isData());
+    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
+    // Only one refCnt for RPC path.
+    Assert.assertEquals(block1.refCnt(), 1);
+    Assert.assertEquals(block2.refCnt(), 1);
+    Assert.assertFalse(block1 == block2);
+
+    scanner.seekTo(firstCell);
+    curBlock = scanner.curBlock;
+    Assert.assertEquals(curBlock.refCnt(), 2);
+
+    // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
+    // refCnt should be unchanged.
+    scanner.seekTo(firstCell);
+    Assert.assertTrue(curBlock == scanner.curBlock);
+    Assert.assertEquals(curBlock.refCnt(), 2);
+    prevBlock = curBlock;
+
+    scanner.seekTo(secondCell);
+    curBlock = scanner.curBlock;
+    Assert.assertEquals(prevBlock.refCnt(), 2);
+    Assert.assertEquals(curBlock.refCnt(), 2);
+
+    // After shipped, the prevBlock will be release, but curBlock is still referenced by the
+    // curBlock.
+    scanner.shipped();
+    Assert.assertEquals(prevBlock.refCnt(), 1);
+    Assert.assertEquals(curBlock.refCnt(), 2);
+
+    // Try to ship again, though with nothing to client.
+    scanner.shipped();
+    Assert.assertEquals(prevBlock.refCnt(), 1);
+    Assert.assertEquals(curBlock.refCnt(), 2);
+
+    // The curBlock will also be released.
+    scanner.close();
+    Assert.assertEquals(curBlock.refCnt(), 1);
+
+    // Finish the block & block2 RPC path
+    Assert.assertTrue(block1.release());
+    Assert.assertTrue(block2.release());
+
+    // Evict the LRUBlockCache
+    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
+    Assert.assertEquals(prevBlock.refCnt(), 0);
+    Assert.assertEquals(curBlock.refCnt(), 0);
+
+    int count = 0;
+    Assert.assertTrue(scanner.seekTo());
+    ++count;
+    while (scanner.next()) {
+      count++;
     }
+    assertEquals(CELL_COUNT, count);
   }
 
   /**
    * See HBASE-22480
    */
   @Test
-  public void testSeekBefore() throws IOException {
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    Path dir = UTIL.getDataTestDir("testSeekBefore");
-    FileSystem fs = dir.getFileSystem(conf);
-    try {
-      String hfileName = "testSeekBefore_hfile_0_" + System.currentTimeMillis();
-      Path hfilePath = new Path(dir, hfileName);
-      int cellCount = 1000;
-      LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
-      writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, cellCount);
-
-      BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
-      Assert.assertNotNull(defaultBC);
-      HFile.Reader reader =
-          HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
-      Assert.assertTrue(reader instanceof HFileReaderImpl);
-      // We've build a HFile tree with index = 16.
-      Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
-
-      HFileScanner scanner = reader.getScanner(true, true, false);
-      HFileBlock block1 = reader.getDataBlockIndexReader()
-          .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-          .getHFileBlock();
-      HFileBlock block2 = reader.getDataBlockIndexReader()
-          .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
-          .getHFileBlock();
-      Assert.assertEquals(block1.refCnt(), 2);
-      Assert.assertEquals(block2.refCnt(), 2);
-
-      // Let the curBlock refer to block2.
-      scanner.seekTo(secondCell);
-      Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block2);
-      Assert.assertEquals(3, block2.refCnt());
-
-      // Release the block1, only one reference: blockCache.
-      Assert.assertFalse(block1.release());
-      Assert.assertEquals(1, block1.refCnt());
-      // Release the block2, so the remain references are: 1. scanner; 2. blockCache.
-      Assert.assertFalse(block2.release());
-      Assert.assertEquals(2, block2.refCnt());
-
-      // Do the seekBefore: the newBlock will be the previous block of curBlock.
-      Assert.assertTrue(scanner.seekBefore(secondCell));
-      Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
-      // Two reference for block1: 1. scanner; 2. blockCache.
-      Assert.assertEquals(2, block1.refCnt());
-      // Reference count of block2 must be unchanged because we haven't shipped.
-      Assert.assertEquals(2, block2.refCnt());
-
-      // Do the shipped
-      scanner.shipped();
-      Assert.assertEquals(2, block1.refCnt());
-      Assert.assertEquals(1, block2.refCnt());
-
-      // Do the close
-      scanner.close();
-      Assert.assertEquals(1, block1.refCnt());
-      Assert.assertEquals(1, block2.refCnt());
-
-      Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
-      Assert.assertEquals(0, block1.refCnt());
-      Assert.assertEquals(0, block2.refCnt());
-
-      // Reload the block1 again.
-      block1 = reader.getDataBlockIndexReader()
-          .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
-          .getHFileBlock();
-      Assert.assertFalse(block1.release());
-      Assert.assertEquals(1, block1.refCnt());
-      // Re-seek to the begin.
-      Assert.assertTrue(scanner.seekTo());
-      Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
-      Assert.assertEquals(2, block1.refCnt());
-      // Return false because firstCell <= c[0]
-      Assert.assertFalse(scanner.seekBefore(firstCell));
-      // The block1 shouldn't be released because we still don't do the shipped or close.
-      Assert.assertEquals(2, block1.refCnt());
-
-      scanner.close();
-      Assert.assertEquals(1, block1.refCnt());
-      Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 1);
-      Assert.assertEquals(0, block1.refCnt());
-    } finally {
-      fs.delete(dir, true);
-    }
+  public void testSeekBefore() throws Exception {
+    HFileBlock curBlock, prevBlock;
+    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
+    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
+    Assert.assertNotNull(defaultBC);
+    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
+    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
+    Assert.assertTrue(reader instanceof HFileReaderImpl);
+    // We've build a HFile tree with index = 16.
+    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
+
+    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
+    HFileBlock block1 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    Assert.assertTrue(block1.getBlockType().isData());
+    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
+    HFileBlock block2 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    Assert.assertTrue(block2.getBlockType().isData());
+    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
+    // Wait until flushed to IOEngine;
+    waitBucketCacheFlushed(defaultBC);
+    // One RPC reference path.
+    Assert.assertEquals(block1.refCnt(), 1);
+    Assert.assertEquals(block2.refCnt(), 1);
+
+    // Let the curBlock refer to block2.
+    scanner.seekTo(secondCell);
+    curBlock = scanner.curBlock;
+    Assert.assertFalse(curBlock == block2);
+    Assert.assertEquals(1, block2.refCnt());
+    Assert.assertEquals(2, curBlock.refCnt());
+    prevBlock = scanner.curBlock;
+
+    // Release the block1, no other reference.
+    Assert.assertTrue(block1.release());
+    Assert.assertEquals(0, block1.refCnt());
+    // Release the block2, no other reference.
+    Assert.assertTrue(block2.release());
+    Assert.assertEquals(0, block2.refCnt());
+
+    // Do the seekBefore: the newBlock will be the previous block of curBlock.
+    Assert.assertTrue(scanner.seekBefore(secondCell));
+    Assert.assertEquals(scanner.prevBlocks.size(), 1);
+    Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
+    curBlock = scanner.curBlock;
+    // the curBlock is read from IOEngine, so a different block.
+    Assert.assertFalse(curBlock == block1);
+    // Two reference for curBlock: 1. scanner; 2. blockCache.
+    Assert.assertEquals(2, curBlock.refCnt());
+    // Reference count of prevBlock must be unchanged because we haven't shipped.
+    Assert.assertEquals(2, prevBlock.refCnt());
+
+    // Do the shipped
+    scanner.shipped();
+    Assert.assertEquals(scanner.prevBlocks.size(), 0);
+    Assert.assertNotNull(scanner.curBlock);
+    Assert.assertEquals(2, curBlock.refCnt());
+    Assert.assertEquals(1, prevBlock.refCnt());
+
+    // Do the close
+    scanner.close();
+    Assert.assertNull(scanner.curBlock);
+    Assert.assertEquals(1, curBlock.refCnt());
+    Assert.assertEquals(1, prevBlock.refCnt());
+
+    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
+    Assert.assertEquals(0, curBlock.refCnt());
+    Assert.assertEquals(0, prevBlock.refCnt());
+
+    // Reload the block1 again.
+    block1 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    // Wait until flushed to IOEngine;
+    waitBucketCacheFlushed(defaultBC);
+    Assert.assertTrue(block1.getBlockType().isData());
+    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
+    Assert.assertTrue(block1.release());
+    Assert.assertEquals(0, block1.refCnt());
+    // Re-seek to the begin.
+    Assert.assertTrue(scanner.seekTo());
+    curBlock = scanner.curBlock;
+    Assert.assertFalse(curBlock == block1);
+    Assert.assertEquals(2, curBlock.refCnt());
+    // Return false because firstCell <= c[0]
+    Assert.assertFalse(scanner.seekBefore(firstCell));
+    // The block1 shouldn't be released because we still don't do the shipped or close.
+    Assert.assertEquals(2, curBlock.refCnt());
+
+    scanner.close();
+    Assert.assertEquals(1, curBlock.refCnt());
+    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
+    Assert.assertEquals(0, curBlock.refCnt());
   }
 
   @Test
@@ -298,4 +370,56 @@ public class TestHFileScannerImplReferenceCount {
   public void testDataBlockEncodingAndCompression() throws Exception {
     testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
   }
+
+  @Test
+  public void testWithLruBlockCache() throws Exception {
+    HFileBlock curBlock;
+    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
+    // Set LruBlockCache
+    conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
+    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
+    Assert.assertNotNull(defaultBC);
+    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
+    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
+    Assert.assertTrue(reader instanceof HFileReaderImpl);
+    // We've build a HFile tree with index = 16.
+    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
+
+    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
+    HFileBlock block1 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    Assert.assertTrue(block1.getBlockType().isData());
+    Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
+    HFileBlock block2 = reader.getDataBlockIndexReader()
+        .loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
+        .getHFileBlock();
+    Assert.assertTrue(block2.getBlockType().isData());
+    Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
+    // One RPC reference path.
+    Assert.assertEquals(block1.refCnt(), 0);
+    Assert.assertEquals(block2.refCnt(), 0);
+
+    scanner.seekTo(firstCell);
+    curBlock = scanner.curBlock;
+    Assert.assertTrue(curBlock == block1);
+    Assert.assertEquals(curBlock.refCnt(), 0);
+    Assert.assertTrue(scanner.prevBlocks.isEmpty());
+
+    // Switch to next block
+    scanner.seekTo(secondCell);
+    curBlock = scanner.curBlock;
+    Assert.assertTrue(curBlock == block2);
+    Assert.assertEquals(curBlock.refCnt(), 0);
+    Assert.assertEquals(curBlock.retain().refCnt(), 0);
+    // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
+    // in prevBlocks.
+    Assert.assertTrue(scanner.prevBlocks.isEmpty());
+
+    // close the scanner
+    scanner.close();
+    Assert.assertNull(scanner.curBlock);
+    Assert.assertTrue(scanner.prevBlocks.isEmpty());
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
index 9b4d768..af70f3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -820,10 +821,10 @@ public class TestLruBlockCache {
     byte[] byteArr = new byte[length];
     ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
     HFileContext meta = new HFileContextBuilder().build();
-    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
-    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
+    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
+    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
 
     LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
         (int)Math.ceil(1.2*maxSize/blockSize),
@@ -964,7 +965,8 @@ public class TestLruBlockCache {
     HFileContext meta = new HFileContextBuilder().build();
     BlockCacheKey key = new BlockCacheKey("key1", 0);
     HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
-        ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
+        ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, 52, -1, meta,
+        HEAP);
     AtomicBoolean err1 = new AtomicBoolean(false);
     Thread t1 = new Thread(() -> {
       for (int i = 0; i < 10000 && !err1.get(); i++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 121e070..4ac7907 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -203,7 +203,7 @@ public class TestBucketCache {
     CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
   }
 
-  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
+  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
       throws InterruptedException {
     while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
       Thread.sleep(100);
@@ -211,6 +211,13 @@ public class TestBucketCache {
     Thread.sleep(1000);
   }
 
+  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
+    while (!cache.ramCache.isEmpty()) {
+      Thread.sleep(100);
+    }
+    Thread.sleep(1000);
+  }
+
   // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
   // threads will flush it to the bucket and put reference entry in backingMap.
   private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
@@ -430,10 +437,10 @@ public class TestBucketCache {
     ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
     HFileContext meta = new HFileContextBuilder().build();
     ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
-    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf1,
-        HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
-    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf2,
-        HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
+    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
+    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
 
     BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
     ByteBuffer actualBuffer = ByteBuffer.allocate(length);
@@ -492,10 +499,10 @@ public class TestBucketCache {
     RAMCache cache = new RAMCache();
     BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
     BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
-    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-        -1, 52, -1, meta, ByteBuffAllocator.HEAP);
-    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-        -1, -1, -1, meta, ByteBuffAllocator.HEAP);
+    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+        HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
+    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+        HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
     RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
     RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
 
@@ -527,8 +534,8 @@ public class TestBucketCache {
     int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
     ByteBuffer buf = ByteBuffer.allocate(length);
     HFileContext meta = new HFileContextBuilder().build();
-    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-        offset, 52, -1, meta, ByteBuffAllocator.HEAP);
+    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
+        HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
 
     // initialize an mocked ioengine.
     IOEngine ioEngine = Mockito.mock(IOEngine.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index cf356f3..a9f50cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.ClassRule;
@@ -69,7 +70,7 @@ public class TestBucketCacheRefCnt {
   }
 
   private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
-    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
+    return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
         HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index 5c5dda6..07b4b3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.Assert;
@@ -57,9 +58,9 @@ public class TestRAMCache {
         int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
         long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
         HFileContext fileContext, ByteBuffAllocator allocator) {
-      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
-          fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
-          allocator);
+      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
+          ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
+          fileContext, allocator);
     }
 
     public void setLatch(CountDownLatch latch) {