You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/09/09 09:52:27 UTC

[hbase] branch branch-2 updated: HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 4282d70  HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)
4282d70 is described below

commit 4282d702123ee0e3e37df95a3ac57e301f10554f
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Mon Sep 9 17:38:33 2019 +0800

    HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)
---
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 35 ++++++++++-
 .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 65 ++++++++++++++++----
 .../apache/hadoop/hbase/nio/SingleByteBuff.java    | 21 ++++++-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  7 ++-
 .../hadoop/hbase/io/hfile/bucket/BucketEntry.java  |  7 ++-
 .../hfile/bucket/ExclusiveMemoryMmapIOEngine.java  |  5 +-
 .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 64 ++++++++++----------
 .../hfile/TestHFileScannerImplReferenceCount.java  | 70 +++++++++++++++-------
 .../io/hfile/bucket/TestByteBufferIOEngine.java    | 11 +++-
 .../hbase/io/hfile/bucket/TestFileIOEngine.java    | 32 ++++++++++
 10 files changed, 240 insertions(+), 77 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index c04c3f5..3e0a830 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.nio;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
@@ -450,10 +451,37 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
    */
   public abstract int read(ReadableByteChannel channel) throws IOException;
 
+  /**
+   * Reads bytes from FileChannel into this ByteBuff
+   */
+  public abstract int read(FileChannel channel, long offset) throws IOException;
+
+  /**
+   * Write this ByteBuff's data into target file
+   */
+  public abstract int write(FileChannel channel, long offset) throws IOException;
+
+  /**
+   * function interface for Channel read
+   */
+  @FunctionalInterface
+  interface ChannelReader {
+    int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
+  }
+
+  static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
+    return channel.read(buf);
+  };
+
+  static final ChannelReader FILE_READER = (channel, buf, offset) -> {
+    return ((FileChannel)channel).read(buf, offset);
+  };
+
   // static helper methods
-  public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
+  public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
+      ChannelReader reader) throws IOException {
     if (buf.remaining() <= NIO_BUFFER_LIMIT) {
-      return channel.read(buf);
+      return reader.read(channel, buf, offset);
     }
     int originalLimit = buf.limit();
     int initialRemaining = buf.remaining();
@@ -463,7 +491,8 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
       try {
         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
         buf.limit(buf.position() + ioSize);
-        ret = channel.read(buf);
+        offset += ret;
+        ret = reader.read(channel, buf, offset);
         if (ret < ioSize) {
           break;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 3ce1709..df0ae8e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -24,7 +24,10 @@ import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.InvalidMarkException;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
   private int markedItemIndex = -1;
   private final int[] itemBeginPos;
 
+  private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
+    @Override
+    public boolean hasNext() {
+      return curItemIndex < limitedItemIndex ||
+          (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
+    }
+
+    @Override
+    public ByteBuffer next() {
+      if (curItemIndex >= items.length) {
+        throw new NoSuchElementException("items overflow");
+      }
+      curItem = items[curItemIndex++];
+      return curItem;
+    }
+  };
+
   public MultiByteBuff(ByteBuffer... items) {
     this(NONE, items);
   }
@@ -1064,23 +1084,44 @@ public class MultiByteBuff extends ByteBuff {
     return output;
   }
 
+  private int internalRead(ReadableByteChannel channel, long offset,
+      ChannelReader reader) throws IOException {
+    checkRefCount();
+    int total = 0;
+    while (buffsIterator.hasNext()) {
+      ByteBuffer buffer = buffsIterator.next();
+      int len = read(channel, buffer, offset, reader);
+      if (len > 0) {
+        total += len;
+        offset += len;
+      }
+      if (buffer.hasRemaining()) {
+        break;
+      }
+    }
+    return total;
+  }
+
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
+    return internalRead(channel, 0, CHANNEL_READER);
+  }
+
+  @Override
+  public int read(FileChannel channel, long offset) throws IOException {
+    return internalRead(channel, offset, FILE_READER);
+  }
+
+  @Override
+  public int write(FileChannel channel, long offset) throws IOException {
     checkRefCount();
     int total = 0;
-    while (true) {
-      // Read max possible into the current BB
-      int len = channelRead(channel, this.curItem);
-      if (len > 0)
+    while (buffsIterator.hasNext()) {
+      ByteBuffer buffer = buffsIterator.next();
+      while (buffer.hasRemaining()) {
+        int len = channel.write(buffer, offset);
         total += len;
-      if (this.curItem.hasRemaining()) {
-        // We were not able to read enough to fill the current BB itself. Means there is no point in
-        // doing more reads from Channel. Only this much there for now.
-        break;
-      } else {
-        if (this.curItemIndex >= this.limitedItemIndex) break;
-        this.curItemIndex++;
-        this.curItem = this.items[this.curItemIndex];
+        offset += len;
       }
     }
     return total;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 36a83a0..797bfdc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
@@ -371,7 +372,25 @@ public class SingleByteBuff extends ByteBuff {
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
     checkRefCount();
-    return channelRead(channel, buf);
+    return read(channel, buf, 0, CHANNEL_READER);
+  }
+
+  @Override
+  public int read(FileChannel channel, long offset) throws IOException {
+    checkRefCount();
+    return read(channel, buf, offset, FILE_READER);
+  }
+
+  @Override
+  public int write(FileChannel channel, long offset) throws IOException {
+    checkRefCount();
+    int total = 0;
+    while(buf.hasRemaining()) {
+      int len = channel.write(buf, offset);
+      total += len;
+      offset += len;
+    }
+    return total;
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 106bf3a..ef8c4d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -502,8 +502,11 @@ public class BucketCache implements BlockCache, HeapSize {
           // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
           // the same BucketEntry, then all of the three will share the same refCnt.
           Cacheable cachedBlock = ioEngine.read(bucketEntry);
-          // RPC start to reference, so retain here.
-          cachedBlock.retain();
+          if (ioEngine.usesSharedMemory()) {
+            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
+            // same RefCnt, do retain here, in order to count the number of RPC references
+            cachedBlock.retain();
+          }
           // Update the cache statistics.
           if (updateCacheMetrics) {
             cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index ca41eca..2dd7775 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
    */
   private final RefCnt refCnt;
   final AtomicBoolean markedAsEvicted;
-  private final ByteBuffAllocator allocator;
+  final ByteBuffAllocator allocator;
 
   /**
    * Time this block was cached. Presumes we are created just before we are added to the cache.
@@ -194,7 +194,10 @@ class BucketEntry implements HBaseReferenceCounted {
   }
 
   Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
-    ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
+    return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
+  }
+
+  Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
     return this.deserializerReference().deserialize(buf, allocator);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
index 3d7f2b1..3169a66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -35,9 +34,9 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
 
   @Override
   public Cacheable read(BucketEntry be) throws IOException {
-    ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
+    ByteBuff dst = be.allocator.allocate(be.getLength());
     bufferArray.read(be.offset(), dst);
     dst.position(0).limit(be.getLength());
-    return be.wrapAsCacheable(dst.nioByteBuffers());
+    return be.wrapAsCacheable(dst);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index b3afe48..9e6a75b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -129,20 +129,25 @@ public class FileIOEngine implements IOEngine {
     long offset = be.offset();
     int length = be.getLength();
     Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
-    ByteBuffer dstBuffer = ByteBuffer.allocate(length);
+    ByteBuff dstBuff = be.allocator.allocate(length);
     if (length != 0) {
-      accessFile(readAccessor, dstBuffer, offset);
-      // The buffer created out of the fileChannel is formed by copying the data from the file
-      // Hence in this case there is no shared memory that we point to. Even if the BucketCache
-      // evicts this buffer from the file the data is already copied and there is no need to
-      // ensure that the results are not corrupted before consuming them.
-      if (dstBuffer.limit() != length) {
-        throw new IllegalArgumentIOException(
-            "Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
+      try {
+        accessFile(readAccessor, dstBuff, offset);
+        // The buffer created out of the fileChannel is formed by copying the data from the file
+        // Hence in this case there is no shared memory that we point to. Even if the BucketCache
+        // evicts this buffer from the file the data is already copied and there is no need to
+        // ensure that the results are not corrupted before consuming them.
+        if (dstBuff.limit() != length) {
+          throw new IllegalArgumentIOException(
+              "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
+        }
+      } catch (IOException ioe) {
+        dstBuff.release();
+        throw ioe;
       }
     }
-    dstBuffer.rewind();
-    return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
+    dstBuff.rewind();
+    return be.wrapAsCacheable(dstBuff);
   }
 
   @VisibleForTesting
@@ -164,10 +169,7 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
-    if (!srcBuffer.hasRemaining()) {
-      return;
-    }
-    accessFile(writeAccessor, srcBuffer, offset);
+    write(ByteBuff.wrap(srcBuffer), offset);
   }
 
   /**
@@ -208,28 +210,30 @@ public class FileIOEngine implements IOEngine {
   }
 
   @Override
-  public void write(ByteBuff srcBuffer, long offset) throws IOException {
-    ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
-    write(dup, offset);
+  public void write(ByteBuff srcBuff, long offset) throws IOException {
+    if (!srcBuff.hasRemaining()) {
+      return;
+    }
+    accessFile(writeAccessor, srcBuff, offset);
   }
 
-  private void accessFile(FileAccessor accessor, ByteBuffer buffer,
+  private void accessFile(FileAccessor accessor, ByteBuff buff,
       long globalOffset) throws IOException {
     int startFileNum = getFileNum(globalOffset);
-    int remainingAccessDataLen = buffer.remaining();
+    int remainingAccessDataLen = buff.remaining();
     int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
     int accessFileNum = startFileNum;
     long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
-    int bufLimit = buffer.limit();
+    int bufLimit = buff.limit();
     while (true) {
       FileChannel fileChannel = fileChannels[accessFileNum];
       int accessLen = 0;
       if (endFileNum > accessFileNum) {
         // short the limit;
-        buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
+        buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
       }
       try {
-        accessLen = accessor.access(fileChannel, buffer, accessOffset);
+        accessLen = accessor.access(fileChannel, buff, accessOffset);
       } catch (ClosedByInterruptException e) {
         throw e;
       } catch (ClosedChannelException e) {
@@ -237,7 +241,7 @@ public class FileIOEngine implements IOEngine {
         continue;
       }
       // recover the limit
-      buffer.limit(bufLimit);
+      buff.limit(bufLimit);
       if (accessLen < remainingAccessDataLen) {
         remainingAccessDataLen -= accessLen;
         accessFileNum++;
@@ -246,7 +250,7 @@ public class FileIOEngine implements IOEngine {
         break;
       }
       if (accessFileNum >= fileChannels.length) {
-        throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
+        throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
             + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
             + globalOffset);
       }
@@ -304,23 +308,23 @@ public class FileIOEngine implements IOEngine {
   }
 
   private interface FileAccessor {
-    int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+    int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
         throws IOException;
   }
 
   private static class FileReadAccessor implements FileAccessor {
     @Override
-    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+    public int access(FileChannel fileChannel, ByteBuff buff,
         long accessOffset) throws IOException {
-      return fileChannel.read(byteBuffer, accessOffset);
+      return buff.read(fileChannel, accessOffset);
     }
   }
 
   private static class FileWriteAccessor implements FileAccessor {
     @Override
-    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+    public int access(FileChannel fileChannel, ByteBuff buff,
         long accessOffset) throws IOException {
-      return fileChannel.write(byteBuffer, accessOffset);
+      return buff.write(fileChannel, accessOffset);
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
index dd9a1c8..fa67039 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
@@ -29,6 +29,8 @@ import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENT
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -58,9 +60,14 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 @Category({ IOTests.class, SmallTests.class })
 public class TestHFileScannerImplReferenceCount {
 
@@ -71,6 +78,15 @@ public class TestHFileScannerImplReferenceCount {
   @Rule
   public TestName CASE = new TestName();
 
+  @Parameters(name = "{index}: ioengine={0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
+      new Object[] { "mmap" }, new Object[] { "pmem" });
+  }
+
+  @Parameter
+  public String ioengine;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -113,12 +129,16 @@ public class TestHFileScannerImplReferenceCount {
 
   @Before
   public void setUp() throws IOException {
+    String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
+    this.workDir = UTIL.getDataTestDir(caseName);
+    if (!"offheap".equals(ioengine)) {
+      ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
+    }
+    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
     this.firstCell = null;
     this.secondCell = null;
     this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
     this.conf = new Configuration(UTIL.getConfiguration());
-    String caseName = CASE.getMethodName();
-    this.workDir = UTIL.getDataTestDir(caseName);
     this.fs = this.workDir.getFileSystem(conf);
     this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
     LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
@@ -202,34 +222,34 @@ public class TestHFileScannerImplReferenceCount {
 
     scanner.seekTo(firstCell);
     curBlock = scanner.curBlock;
-    Assert.assertEquals(curBlock.refCnt(), 2);
+    this.assertRefCnt(curBlock, 2);
 
     // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
     // refCnt should be unchanged.
     scanner.seekTo(firstCell);
     Assert.assertTrue(curBlock == scanner.curBlock);
-    Assert.assertEquals(curBlock.refCnt(), 2);
+    this.assertRefCnt(curBlock, 2);
     prevBlock = curBlock;
 
     scanner.seekTo(secondCell);
     curBlock = scanner.curBlock;
-    Assert.assertEquals(prevBlock.refCnt(), 2);
-    Assert.assertEquals(curBlock.refCnt(), 2);
+    this.assertRefCnt(prevBlock, 2);
+    this.assertRefCnt(curBlock, 2);
 
     // After shipped, the prevBlock will be release, but curBlock is still referenced by the
     // curBlock.
     scanner.shipped();
-    Assert.assertEquals(prevBlock.refCnt(), 1);
-    Assert.assertEquals(curBlock.refCnt(), 2);
+    this.assertRefCnt(prevBlock, 1);
+    this.assertRefCnt(curBlock, 2);
 
     // Try to ship again, though with nothing to client.
     scanner.shipped();
-    Assert.assertEquals(prevBlock.refCnt(), 1);
-    Assert.assertEquals(curBlock.refCnt(), 2);
+    this.assertRefCnt(prevBlock, 1);
+    this.assertRefCnt(curBlock, 2);
 
     // The curBlock will also be released.
     scanner.close();
-    Assert.assertEquals(curBlock.refCnt(), 1);
+    this.assertRefCnt(curBlock, 1);
 
     // Finish the block & block2 RPC path
     Assert.assertTrue(block1.release());
@@ -287,7 +307,7 @@ public class TestHFileScannerImplReferenceCount {
     curBlock = scanner.curBlock;
     Assert.assertFalse(curBlock == block2);
     Assert.assertEquals(1, block2.refCnt());
-    Assert.assertEquals(2, curBlock.refCnt());
+    this.assertRefCnt(curBlock, 2);
     prevBlock = scanner.curBlock;
 
     // Release the block1, no other reference.
@@ -305,22 +325,22 @@ public class TestHFileScannerImplReferenceCount {
     // the curBlock is read from IOEngine, so a different block.
     Assert.assertFalse(curBlock == block1);
     // Two reference for curBlock: 1. scanner; 2. blockCache.
-    Assert.assertEquals(2, curBlock.refCnt());
+    this.assertRefCnt(curBlock, 2);
     // Reference count of prevBlock must be unchanged because we haven't shipped.
-    Assert.assertEquals(2, prevBlock.refCnt());
+    this.assertRefCnt(prevBlock, 2);
 
     // Do the shipped
     scanner.shipped();
     Assert.assertEquals(scanner.prevBlocks.size(), 0);
     Assert.assertNotNull(scanner.curBlock);
-    Assert.assertEquals(2, curBlock.refCnt());
-    Assert.assertEquals(1, prevBlock.refCnt());
+    this.assertRefCnt(curBlock, 2);
+    this.assertRefCnt(prevBlock, 1);
 
     // Do the close
     scanner.close();
     Assert.assertNull(scanner.curBlock);
-    Assert.assertEquals(1, curBlock.refCnt());
-    Assert.assertEquals(1, prevBlock.refCnt());
+    this.assertRefCnt(curBlock, 1);
+    this.assertRefCnt(prevBlock, 1);
 
     Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
     Assert.assertEquals(0, curBlock.refCnt());
@@ -340,18 +360,26 @@ public class TestHFileScannerImplReferenceCount {
     Assert.assertTrue(scanner.seekTo());
     curBlock = scanner.curBlock;
     Assert.assertFalse(curBlock == block1);
-    Assert.assertEquals(2, curBlock.refCnt());
+    this.assertRefCnt(curBlock, 2);
     // Return false because firstCell <= c[0]
     Assert.assertFalse(scanner.seekBefore(firstCell));
     // The block1 shouldn't be released because we still don't do the shipped or close.
-    Assert.assertEquals(2, curBlock.refCnt());
+    this.assertRefCnt(curBlock, 2);
 
     scanner.close();
-    Assert.assertEquals(1, curBlock.refCnt());
+    this.assertRefCnt(curBlock, 1);
     Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
     Assert.assertEquals(0, curBlock.refCnt());
   }
 
+  private void assertRefCnt(HFileBlock block, int value) {
+    if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
+      Assert.assertEquals(value, block.refCnt());
+    } else {
+      Assert.assertEquals(value - 1, block.refCnt());
+    }
+  }
+
   @Test
   public void testDefault() throws Exception {
     testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 2184fa5..d1b8f9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Assert;
@@ -48,8 +49,8 @@ public class TestByteBufferIOEngine {
   private static class MockBucketEntry extends BucketEntry {
     private long off;
 
-    MockBucketEntry(long offset, int length) {
-      super(offset & 0xFF00, length, 0, false);
+    MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
+      super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
       this.off = offset;
     }
 
@@ -66,7 +67,11 @@ public class TestByteBufferIOEngine {
   }
 
   static BucketEntry createBucketEntry(long offset, int len) {
-    BucketEntry be = new MockBucketEntry(offset, len);
+    return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
+  }
+
+  static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
+    BucketEntry be = new MockBucketEntry(offset, len, allocator);
     be.setDeserializerReference(DESERIALIZER);
     return be;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 6bd91d0..8f86e83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,8 +31,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.After;
@@ -40,6 +44,9 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Basic test for {@link FileIOEngine}
@@ -131,6 +138,31 @@ public class TestFileIOEngine {
   }
 
   @Test
+  public void testReadFailedShouldReleaseByteBuff() {
+    ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
+    final RefCnt refCnt = RefCnt.create();
+    Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
+      @Override
+      public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
+        int len = invocation.getArgument(0);
+        return ByteBuff.wrap(new ByteBuffer[]{ByteBuffer.allocate(len + 1)}, refCnt);
+      }
+    });
+    int len = 10;
+    byte[] data1 = new byte[len];
+    assertEquals(1, refCnt.refCnt());
+    try {
+      fileIOEngine.write(ByteBuffer.wrap(data1), 0);
+      BucketEntry be = createBucketEntry(0, len, alloc);
+      fileIOEngine.read(be);
+      fail();
+    } catch (IOException ioe) {
+      // expected exception.
+    }
+    assertEquals(0, refCnt.refCnt());
+  }
+
+  @Test
   public void testClosedChannelException() throws IOException {
     fileIOEngine.closeFileChannels();
     int len = 5;