You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:03 UTC

[hbase] 01/22: HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 568d12933b4503a91c5d3cd08b4aef4b7f9c0824
Author: huzheng <op...@gmail.com>
AuthorDate: Sat Feb 16 17:16:09 2019 +0800

    HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool
---
 .../apache/hadoop/hbase/ipc/CellBlockBuilder.java  |   9 +-
 hbase-common/pom.xml                               |   4 +
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  | 282 +++++++++++++++++++
 .../hbase/io/ByteBufferListOutputStream.java       |  40 ++-
 .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 155 -----------
 .../hbase/io/encoding/CopyKeyDataBlockEncoder.java |   2 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java     |   2 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java    |   2 +-
 .../hbase/io/encoding/PrefixKeyDeltaEncoder.java   |   2 +-
 .../hadoop/hbase/io/encoding/RowIndexSeekerV1.java |   2 +-
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 146 +++++-----
 .../org/apache/hadoop/hbase/nio/MultiByteBuff.java |  98 +++++--
 .../java/org/apache/hadoop/hbase/nio/RefCnt.java   |  49 ++++
 .../apache/hadoop/hbase/nio/SingleByteBuff.java    |  92 ++++--
 .../apache/hadoop/hbase/util/ByteBufferArray.java  |  10 +-
 .../apache/hadoop/hbase/util/ByteBufferUtils.java  |  31 ++-
 .../hadoop/hbase/io/TestByteBuffAllocator.java     | 309 +++++++++++++++++++++
 .../hbase/io/TestByteBufferListOutputStream.java   |  18 +-
 .../apache/hadoop/hbase/io/TestByteBufferPool.java |  67 -----
 .../apache/hadoop/hbase/nio/TestMultiByteBuff.java |   4 +-
 .../apache/hadoop/hbase/io/hfile/Cacheable.java    |   7 +-
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |   2 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |   2 +-
 .../apache/hadoop/hbase/ipc/NettyServerCall.java   |  12 +-
 .../hadoop/hbase/ipc/NettyServerRpcConnection.java |   9 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  96 +------
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  18 +-
 .../apache/hadoop/hbase/ipc/SimpleRpcServer.java   |   2 +-
 .../apache/hadoop/hbase/ipc/SimpleServerCall.java  |  15 +-
 .../hbase/ipc/SimpleServerRpcConnection.java       |  26 +-
 .../client/TestAsyncTableGetMultiThreaded.java     |   4 +-
 .../hbase/client/TestServerLoadDurability.java     |   8 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      |   2 +-
 .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 144 ----------
 34 files changed, 974 insertions(+), 697 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index 8d68e87..111f768 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -208,7 +208,7 @@ class CellBlockBuilder {
    * @param codec to use for encoding
    * @param compressor to use for encoding
    * @param cellScanner to encode
-   * @param pool Pool of ByteBuffers to make use of.
+   * @param allocator to allocate the {@link ByteBuff}.
    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
    *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
    *         been flipped and is ready for reading. Use limit to find total size. If
@@ -217,15 +217,14 @@ class CellBlockBuilder {
    * @throws IOException if encoding the cells fail
    */
   public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
-      CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+      CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
     if (cellScanner == null) {
       return null;
     }
     if (codec == null) {
       throw new CellScannerButNoCodecException();
     }
-    assert pool != null;
-    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
     encodeCellsTo(bbos, cellScanner, codec, compressor);
     if (bbos.size() == 0) {
       bbos.releaseResources();
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 7d7dea2..c23b9d4 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -151,6 +151,10 @@
       <artifactId>hbase-shaded-miscellaneous</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-netty</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
new file mode 100644
index 0000000..1833462
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
+ * it provide high-level interfaces for upstream. when allocating desired memory size, it will
+ * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
+ * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
+ * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
+ * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
+ * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
+ * better way is increasing the ByteBufferPool size if we detected this case. <br/>
+ * <br/>
+ * On the other hand, for better memory utilization, we have set an lower bound named
+ * minSizeForReservoirUse in this allocator, and if the desired size is less than
+ * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
+ * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
+ * small objects. <br/>
+ * <br/>
+ * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
+ * read/write path, because it hide the details of memory management and its APIs are more friendly
+ * to the upper layer.
+ */
+@InterfaceAudience.Private
+public class ByteBuffAllocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
+
+  public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
+
+  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
+  // 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel.
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+  public static final String MIN_ALLOCATE_SIZE_KEY =
+      "hbase.ipc.server.reservoir.minimal.allocating.size";
+
+  public static final Recycler NONE = () -> {
+  };
+
+  public interface Recycler {
+    void free();
+  }
+
+  private final boolean reservoirEnabled;
+  private final int bufSize;
+  private final int maxBufCount;
+  private final AtomicInteger usedBufCount = new AtomicInteger(0);
+
+  private boolean maxPoolSizeInfoLevelLogged = false;
+
+  // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
+  // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
+  private final int minSizeForReservoirUse;
+
+  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
+
+  /**
+   * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
+   * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
+   * allocate the insufficient buffers from on-heap to meet the requirement.
+   * @param conf which get the arguments to initialize the allocator.
+   * @param reservoirEnabled indicate whether the reservoir is enabled or disabled.
+   * @return ByteBuffAllocator to manage the byte buffers.
+   */
+  public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
+    int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
+    if (reservoirEnabled) {
+      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
+      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
+      // at which we will send back one RPC request. Means max we need 2 MB for creating the
+      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
+      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
+      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
+      // is by default 64 KB.
+      // In case of read request, at the end of the handler process, we will make the response
+      // cellblock and add the Call to connection's response Q and a single Responder thread takes
+      // connections and responses from that one by one and do the socket write. So there is chances
+      // that by the time a handler originated response is actually done writing to socket and so
+      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
+      // we consider and consider that also for the max buffers to pool
+      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
+      int maxBuffCount =
+          conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+            HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
+      int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
+      return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
+    } else {
+      return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
+   * designed for testing purpose or disabled reservoir case.
+   * @return allocator to allocate on-heap ByteBuffer.
+   */
+  public static ByteBuffAllocator createOnHeap() {
+    return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
+  }
+
+  @VisibleForTesting
+  ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
+      int minSizeForReservoirUse) {
+    this.reservoirEnabled = reservoirEnabled;
+    this.maxBufCount = maxBufCount;
+    this.bufSize = bufSize;
+    this.minSizeForReservoirUse = minSizeForReservoirUse;
+  }
+
+  public boolean isReservoirEnabled() {
+    return reservoirEnabled;
+  }
+
+  @VisibleForTesting
+  public int getQueueSize() {
+    return this.buffers.size();
+  }
+
+  /**
+   * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
+   * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
+   * ByteBuffer pool.
+   * @return an ByteBuff with the buffer size.
+   */
+  public SingleByteBuff allocateOneBuffer() {
+    if (isReservoirEnabled()) {
+      ByteBuffer bb = getBuffer();
+      if (bb != null) {
+        return new SingleByteBuff(() -> putbackBuffer(bb), bb);
+      }
+    }
+    // Allocated from heap, let the JVM free its memory.
+    return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
+  }
+
+  /**
+   * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
+   * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
+   * @param size to allocate
+   * @return an ByteBuff with the desired size.
+   */
+  public ByteBuff allocate(int size) {
+    if (size < 0) {
+      throw new IllegalArgumentException("size to allocate should >=0");
+    }
+    // If disabled the reservoir, just allocate it from on-heap.
+    if (!isReservoirEnabled() || size == 0) {
+      return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
+    }
+    int reminder = size % bufSize;
+    int len = size / bufSize + (reminder > 0 ? 1 : 0);
+    List<ByteBuffer> bbs = new ArrayList<>(len);
+    // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
+    // reservoir is exhausted.
+    int remain = size;
+    while (remain >= minSizeForReservoirUse) {
+      ByteBuffer bb = this.getBuffer();
+      if (bb == null) {
+        break;
+      }
+      bbs.add(bb);
+      remain -= bufSize;
+    }
+    int lenFromReservoir = bbs.size();
+    if (remain > 0) {
+      // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
+      // just allocate the ByteBuffer from on-heap.
+      bbs.add(ByteBuffer.allocate(remain));
+    }
+    ByteBuff bb = wrap(bbs, () -> {
+      for (int i = 0; i < lenFromReservoir; i++) {
+        this.putbackBuffer(bbs.get(i));
+      }
+    });
+    bb.limit(size);
+    return bb;
+  }
+
+  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
+    if (buffers == null || buffers.length == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
+        : new MultiByteBuff(recycler, buffers);
+  }
+
+  public static ByteBuff wrap(ByteBuffer[] buffers) {
+    return wrap(buffers, NONE);
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
+    if (buffers == null || buffers.size() == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
+        : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers) {
+    return wrap(buffers, NONE);
+  }
+
+  /**
+   * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
+   *         the maximum pool size, it will create a new one and return. In case of max pool size
+   *         also reached, will return null. When pool returned a ByteBuffer, make sure to return it
+   *         back to pool after use.
+   */
+  private ByteBuffer getBuffer() {
+    ByteBuffer bb = buffers.poll();
+    if (bb != null) {
+      // To reset the limit to capacity and position to 0, must clear here.
+      bb.clear();
+      return bb;
+    }
+    while (true) {
+      int c = this.usedBufCount.intValue();
+      if (c >= this.maxBufCount) {
+        if (!maxPoolSizeInfoLevelLogged) {
+          LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
+              + "increasing the value for '{}' ?",
+            maxBufCount, MAX_BUFFER_COUNT_KEY);
+          maxPoolSizeInfoLevelLogged = true;
+        }
+        return null;
+      }
+      if (!this.usedBufCount.compareAndSet(c, c + 1)) {
+        continue;
+      }
+      return ByteBuffer.allocateDirect(bufSize);
+    }
+  }
+
+  /**
+   * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
+   * @param buf ByteBuffer to return.
+   */
+  private void putbackBuffer(ByteBuffer buf) {
+    if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
+      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+      return;
+    }
+    buffers.offer(buf);
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index 0b97abb..e8bd322 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory;
 public class ByteBufferListOutputStream extends ByteBufferOutputStream {
   private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
 
-  private ByteBufferPool pool;
+  private ByteBuffAllocator allocator;
   // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
   // it is not available will make a new one our own and keep writing to that. We keep track of all
   // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
   // to return back all of them to pool
-  protected List<ByteBuffer> allBufs = new ArrayList<>();
-  protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
+  protected List<SingleByteBuff> allBufs = new ArrayList<>();
 
   private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
 
-  public ByteBufferListOutputStream(ByteBufferPool pool) {
-    this.pool = pool;
+  public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
+    this.allocator = allocator;
     allocateNewBuffer();
   }
 
@@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
     if (this.curBuf != null) {
       this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
     }
-    // Get an initial BB to work with from the pool
-    this.curBuf = this.pool.getBuffer();
-    if (this.curBuf == null) {
-      // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
-      // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
-      // direct heap size. See HBASE-15525 also for more details.
-      // Make BB with same size of pool's buffer size.
-      this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
-    } else {
-      this.bufsFromPool.add(this.curBuf);
-    }
-    this.allBufs.add(this.curBuf);
+    // Get an initial ByteBuffer from the allocator.
+    SingleByteBuff sbb = allocator.allocateOneBuffer();
+    this.curBuf = sbb.nioByteBuffers()[0];
+    this.allBufs.add(sbb);
   }
 
   @Override
@@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
       LOG.debug(e.toString(), e);
     }
     // Return back all the BBs to pool
-    if (this.bufsFromPool != null) {
-      for (int i = 0; i < this.bufsFromPool.size(); i++) {
-        this.pool.putbackBuffer(this.bufsFromPool.get(i));
-      }
-      this.bufsFromPool = null;
+    for (ByteBuff buf : this.allBufs) {
+      buf.release();
     }
     this.allBufs = null;
     this.curBuf = null;
@@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
       // All the other BBs are already flipped while moving to the new BB.
       curBuf.flip();
     }
-    return this.allBufs;
+    List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
+    for (SingleByteBuff bb : this.allBufs) {
+      bbs.add(bb.nioByteBuffers()[0]);
+    }
+    return bbs;
   }
 
   @Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
deleted file mode 100644
index caca20b..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
- * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
- * that it will create. When requested, if a free ByteBuffer is already present, it will return
- * that. And when no free ByteBuffer available and we are below the max count, it will create a new
- * one and return that.
- *
- * <p>
- * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
- * pass 'directByteBuffer' as false while construction of the pool.
- * <p>
- * This class is thread safe.
- *
- * @see ByteBufferListOutputStream
- */
-@InterfaceAudience.Private
-public class ByteBufferPool {
-  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
-  // TODO better config names?
-  // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
-  // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
-  public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
-  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
-  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
-                                                          // what we will write/read to/from the
-                                                          // socket channel.
-  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
-
-  private final int bufferSize;
-  private final int maxPoolSize;
-  private AtomicInteger count; // Count of the BBs created already for this pool.
-  private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
-  private boolean maxPoolSizeInfoLevelLogged = false;
-
-  /**
-   * @param bufferSize Size of each buffer created by this pool.
-   * @param maxPoolSize Max number of buffers to keep in this pool.
-   */
-  public ByteBufferPool(int bufferSize, int maxPoolSize) {
-    this(bufferSize, maxPoolSize, true);
-  }
-
-  /**
-   * @param bufferSize Size of each buffer created by this pool.
-   * @param maxPoolSize Max number of buffers to keep in this pool.
-   * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
-   */
-  public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
-    this.bufferSize = bufferSize;
-    this.maxPoolSize = maxPoolSize;
-    this.directByteBuffer = directByteBuffer;
-    // TODO can add initialPoolSize config also and make those many BBs ready for use.
-    LOG.info("Created with bufferSize={} and maxPoolSize={}",
-        org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
-        org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
-    this.count = new AtomicInteger(0);
-  }
-
-  /**
-   * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
-   *         maximum pool size, it will create a new one and return. In case of max pool size also
-   *         reached, will return null. When pool returned a ByteBuffer, make sure to return it back
-   *         to pool after use.
-   * @see #putbackBuffer(ByteBuffer)
-   */
-  public ByteBuffer getBuffer() {
-    ByteBuffer bb = buffers.poll();
-    if (bb != null) {
-      // Clear sets limit == capacity. Position == 0.
-      bb.clear();
-      return bb;
-    }
-    while (true) {
-      int c = this.count.intValue();
-      if (c >= this.maxPoolSize) {
-        if (maxPoolSizeInfoLevelLogged) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
-                + " and no free buffers now. Consider increasing the value for '"
-                + MAX_POOL_SIZE_KEY + "' ?");
-          }
-        } else {
-          LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
-              + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
-              + "' ?");
-          maxPoolSizeInfoLevelLogged = true;
-        }
-        return null;
-      }
-      if (!this.count.compareAndSet(c, c + 1)) {
-        continue;
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
-      }
-      return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
-          : ByteBuffer.allocate(this.bufferSize);
-    }
-  }
-
-  /**
-   * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
-   * obtained from this pool.
-   * @param buf ByteBuffer to return.
-   */
-  public void putbackBuffer(ByteBuffer buf) {
-    if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
-      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
-      return;
-    }
-    buffers.offer(buf);
-  }
-
-  public int getBufferSize() {
-    return this.bufferSize;
-  }
-
-  /**
-   * @return Number of free buffers
-   */
-  @VisibleForTesting
-  public int getQueueSize() {
-    return buffers.size();
-  }
-}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 8bc7974..d7ab009 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
           currentBuffer.skip(current.tagsLength);
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index 01f0a9d..ab93d19 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index baa1856..aa9a436 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 63da7e7..176bea3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
index 14d847c..9c0532e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
@@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
       decodeTags();
     }
     if (includesMvcc()) {
-      current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
     } else {
       current.memstoreTS = 0;
     }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 68cf56e..1ee3607 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
+
+
 /**
- * An abstract class that abstracts out as to how the byte buffers are used,
- * either single or multiple. We have this interface because the java's ByteBuffers
- * cannot be sub-classed. This class provides APIs similar to the ones provided
- * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
- * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
- * helps us in the read path.
+ * An abstract class that abstracts out as to how the byte buffers are used, either single or
+ * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
+ * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
+ * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
+ * have some additional APIs which helps us in the read path. <br/>
+ * The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
+ * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
+ * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
+ * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
+ * the original one will free its memory, because they share the same NIO ByteBuffers. when you want
+ * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
+ * do like this:
+ *
+ * <pre>
+ *   ByteBuff original = ...;
+ *   ByteBuff dup = original.duplicate();
+ *   dup.retain();
+ *   original.release();
+ *   // The NIO buffers can still be accessed unless you release the duplicated one
+ *   dup.get(...);
+ *   dup.release();
+ *   // Both the original and dup can not access the NIO buffers any more.
+ * </pre>
  */
 @InterfaceAudience.Private
-// TODO to have another name. This can easily get confused with netty's ByteBuf
-public abstract class ByteBuff {
+public abstract class ByteBuff implements ReferenceCounted {
+  private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
   private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
 
+  protected RefCnt refCnt;
+
+  /*************************** Methods for reference count **********************************/
+
+  protected void checkRefCount() {
+    ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
+  }
+
+  public int refCnt() {
+    return refCnt.refCnt();
+  }
+
+  @Override
+  public boolean release() {
+    return refCnt.release();
+  }
+
+  @Override
+  public final ByteBuff retain(int increment) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final boolean release(int increment) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ByteBuff touch() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ByteBuff touch(Object hint) {
+    throw new UnsupportedOperationException();
+  }
+
+  /******************************* Methods for ByteBuff **************************************/
+
   /**
    * @return this ByteBuff's current position
    */
@@ -491,78 +550,11 @@ public abstract class ByteBuff {
     return tmpLength;
   }
 
-  /**
-   * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
-   * {@link ByteBuff}.
-   */
-  public static long readVLong(ByteBuff in) {
-    byte firstByte = in.get();
-    int len = WritableUtils.decodeVIntSize(firstByte);
-    if (len == 1) {
-      return firstByte;
-    }
-    long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
-      byte b = in.get();
-      i = i << 8;
-      i = i | (b & 0xFF);
-    }
-    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
-  }
-
-  /**
-   * Search sorted array "a" for byte "key".
-   * 
-   * @param a Array to search. Entries must be sorted and unique.
-   * @param fromIndex First index inclusive of "a" to include in the search.
-   * @param toIndex Last index exclusive of "a" to include in the search.
-   * @param key The byte to search for.
-   * @return The index of key if found. If not found, return -(index + 1), where
-   *         negative indicates "not found" and the "index + 1" handles the "-0"
-   *         case.
-   */
-  public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) {
-    int unsignedKey = key & 0xff;
-    int low = fromIndex;
-    int high = toIndex - 1;
-
-    while (low <= high) {
-      int mid = low + ((high - low) >> 1);
-      int midVal = a.get(mid) & 0xff;
-
-      if (midVal < unsignedKey) {
-        low = mid + 1;
-      } else if (midVal > unsignedKey) {
-        high = mid - 1;
-      } else {
-        return mid; // key found
-      }
-    }
-    return -(low + 1); // key not found.
-  }
+  public abstract ByteBuffer[] nioByteBuffers();
 
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
         ", cap= " + capacity() + "]";
   }
-
-  public static String toStringBinary(final ByteBuff b, int off, int len) {
-    StringBuilder result = new StringBuilder();
-    // Just in case we are passed a 'len' that is > buffer length...
-    if (off >= b.capacity())
-      return result.toString();
-    if (off + len > b.capacity())
-      len = b.capacity() - off;
-    for (int i = off; i < off + len; ++i) {
-      int ch = b.get(i) & 0xFF;
-      if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
-          || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
-        result.append((char) ch);
-      } else {
-        result.append(String.format("\\x%02X", ch));
-      }
-    }
-    return result.toString();
-  }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 97f5141..e9eadc7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
@@ -24,13 +26,12 @@ import java.nio.ByteBuffer;
 import java.nio.InvalidMarkException;
 import java.nio.channels.ReadableByteChannel;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
  * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff {
   private final int[] itemBeginPos;
 
   public MultiByteBuff(ByteBuffer... items) {
+    this(NONE, items);
+  }
+
+  public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
+    this(new RefCnt(recycler), items);
+  }
+
+  private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
+    this.refCnt = refCnt;
     assert items != null;
     assert items.length > 0;
     this.items = items;
@@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff {
     this.limitedItemIndex = this.items.length - 1;
   }
 
-  private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
-      int curItemIndex, int markedIndex) {
+  private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
+      int limitedIndex, int curItemIndex, int markedIndex) {
+    this.refCnt = refCnt;
     this.items = items;
     this.curItemIndex = curItemIndex;
     this.curItem = this.items[this.curItemIndex];
@@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int capacity() {
+    checkRefCount();
     int c = 0;
     for (ByteBuffer item : this.items) {
       c += item.capacity();
@@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte get(int index) {
+    checkRefCount();
     int itemIndex = getItemIndex(index);
     return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
   }
 
   @Override
   public byte getByteAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex = getItemIndexFromCurItemIndex(index);
@@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int getInt(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public int getIntAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public short getShort(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public short getShortAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public long getLong(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public long getLongAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int position() {
+    checkRefCount();
     return itemBeginPos[this.curItemIndex] + this.curItem.position();
   }
 
@@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff position(int position) {
+    checkRefCount();
     // Short circuit for positioning within the cur item. Mostly that is the case.
     if (this.itemBeginPos[this.curItemIndex] <= position
         && this.itemBeginPos[this.curItemIndex + 1] > position) {
@@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff rewind() {
+    checkRefCount();
     for (int i = 0; i < this.items.length; i++) {
       this.items[i].rewind();
     }
@@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff mark() {
+    checkRefCount();
     this.markedItemIndex = this.curItemIndex;
     this.curItem.mark();
     return this;
@@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff reset() {
+    checkRefCount();
     // when the buffer is moved to the next one.. the reset should happen on the previous marked
     // item and the new one should be taken as the base
     if (this.markedItemIndex < 0) throw new InvalidMarkException();
@@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int remaining() {
+    checkRefCount();
     int remain = 0;
     for (int i = curItemIndex; i < items.length; i++) {
       remain += items[i].remaining();
@@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public final boolean hasRemaining() {
+    checkRefCount();
     return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
         && this.items[this.curItemIndex + 1].hasRemaining());
   }
@@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte get() {
+    checkRefCount();
     if (this.curItem.remaining() == 0) {
       if (items.length - 1 == this.curItemIndex) {
         // means cur item is the last one and we wont be able to read a long. Throw exception
@@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public short getShort() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_SHORT) {
       return this.curItem.getShort();
@@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int getInt() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_INT) {
       return this.curItem.getInt();
@@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public long getLong() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_LONG) {
       return this.curItem.getLong();
@@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public void get(byte[] dst, int offset, int length) {
+    checkRefCount();
     while (length > 0) {
       int toRead = Math.min(length, this.curItem.remaining());
       ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
@@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public void get(int sourceOffset, byte[] dst, int offset, int length) {
+    checkRefCount();
     int itemIndex = getItemIndex(sourceOffset);
     ByteBuffer item = this.items[itemIndex];
     sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
@@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff limit(int limit) {
+    checkRefCount();
     this.limit = limit;
     // Normally the limit will try to limit within the last BB item
     int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
@@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff slice() {
+    checkRefCount();
     ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
     for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
       copy[j] = this.items[i].slice();
     }
-    return new MultiByteBuff(copy);
+    return new MultiByteBuff(refCnt, copy);
   }
 
   /**
-   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
-   * of the new MBB will be independent than that of the original MBB.
-   * The content of the new MBB will start at this MBB's current position
-   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
-   * values.
-   * @return a sliced MBB
+   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
+   * new MBB will be independent than that of the original MBB. The content of the new MBB will
+   * start at this MBB's current position The position, limit and mark of the new MBB would be
+   * identical to this MBB in terms of values.
+   * @return a duplicated MBB
    */
   @Override
   public MultiByteBuff duplicate() {
+    checkRefCount();
     ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
     for (int i = 0; i < this.items.length; i++) {
       itemsCopy[i] = items[i].duplicate();
     }
-    return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
-        this.curItemIndex, this.markedItemIndex);
+    return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
+        this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
   }
 
   /**
@@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(byte b) {
+    checkRefCount();
     if (this.curItem.remaining() == 0) {
       if (this.curItemIndex == this.items.length - 1) {
         throw new BufferOverflowException();
@@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(int index, byte b) {
+    checkRefCount();
     int itemIndex = getItemIndex(limit);
     ByteBuffer item = items[itemIndex];
     item.put(index - itemBeginPos[itemIndex], b);
@@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    checkRefCount();
     int destItemIndex = getItemIndex(offset);
     int srcItemIndex = getItemIndex(srcOffset);
     ByteBuffer destItem = this.items[destItemIndex];
@@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff {
   }
 
   private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
-    return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+    return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
         : ((MultiByteBuff) buf).items[index];
   }
 
@@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff putInt(int val) {
+    checkRefCount();
     if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
       this.curItem.putInt(val);
       return this;
@@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(byte[] src, int offset, int length) {
+    checkRefCount();
     if (this.curItem.remaining() >= length) {
       ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
       return this;
@@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff putLong(long val) {
+    checkRefCount();
     if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
       this.curItem.putLong(val);
       return this;
@@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff skip(int length) {
+    checkRefCount();
     // Get available bytes from this item and remaining from next
     int jump = 0;
     while (true) {
@@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff moveBack(int length) {
+    checkRefCount();
     while (length != 0) {
       if (length > curItem.position()) {
         length -= curItem.position();
@@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public ByteBuffer asSubByteBuffer(int length) {
+    checkRefCount();
     if (this.curItem.remaining() >= length) {
       return this.curItem;
     }
@@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff {
     ByteBuffer locCurItem = curItem;
     while (length > 0) {
       int toRead = Math.min(length, locCurItem.remaining());
-      ByteBufferUtils
-          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+      ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
+        toRead);
       length -= toRead;
       if (length == 0) break;
       locCurItemIndex++;
@@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+    checkRefCount();
     if (this.itemBeginPos[this.curItemIndex] <= offset) {
       int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
       if (this.curItem.limit() - relOffsetInCurItem >= length) {
@@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff {
   @Override
   public void get(ByteBuffer out, int sourceOffset,
       int length) {
+    checkRefCount();
       // Not used from real read path actually. So not going with
       // optimization
     for (int i = 0; i < length; ++i) {
@@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte[] toBytes(int offset, int length) {
+    checkRefCount();
     byte[] output = new byte[length];
     this.get(offset, output, 0, length);
     return output;
@@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
+    checkRefCount();
     int total = 0;
     while (true) {
       // Read max possible into the current BB
@@ -1034,13 +1082,19 @@ public class MultiByteBuff extends ByteBuff {
   }
 
   @Override
+  public ByteBuffer[] nioByteBuffers() {
+    checkRefCount();
+    return this.items;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof MultiByteBuff)) return false;
     if (this == obj) return true;
     MultiByteBuff that = (MultiByteBuff) obj;
     if (this.capacity() != that.capacity()) return false;
     if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
-        that.limit()) == 0) {
+      that.limit()) == 0) {
       return true;
     }
     return false;
@@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff {
     return hash;
   }
 
-  /**
-   * @return the ByteBuffers which this wraps.
-   */
-  @VisibleForTesting
-  public ByteBuffer[] getEnclosingByteBuffers() {
-    return this.items;
+  @Override
+  public MultiByteBuff retain() {
+    refCnt.retain();
+    return this;
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
new file mode 100644
index 0000000..80172b2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+
+/**
+ * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
+ * reference count become 0, it'll call {@link Recycler#free()} once.
+ */
+@InterfaceAudience.Private
+class RefCnt extends AbstractReferenceCounted {
+
+  private Recycler recycler = ByteBuffAllocator.NONE;
+
+  RefCnt(Recycler recycler) {
+    this.recycler = recycler;
+  }
+
+  @Override
+  protected final void deallocate() {
+    this.recycler.free();
+  }
+
+  @Override
+  public final ReferenceCounted touch(Object hint) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 6d64d7b..7205251 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,22 +17,24 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 import org.apache.yetus.audience.InterfaceAudience;
-import sun.nio.ch.DirectBuffer;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import sun.nio.ch.DirectBuffer;
 
 /**
- * An implementation of ByteBuff where a single BB backs the BBI. This just acts
- * as a wrapper over a normal BB - offheap or onheap
+ * An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a
+ * normal BB - offheap or onheap
  */
 @InterfaceAudience.Private
 public class SingleByteBuff extends ByteBuff {
@@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff {
   private Object unsafeRef = null;
 
   public SingleByteBuff(ByteBuffer buf) {
+    this(NONE, buf);
+  }
+
+  public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
+    this(new RefCnt(recycler), buf);
+  }
+
+  private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
+    this.refCnt = refCnt;
     this.buf = buf;
     if (buf.hasArray()) {
       this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset();
@@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int position() {
+    checkRefCount();
     return this.buf.position();
   }
 
   @Override
   public SingleByteBuff position(int position) {
+    checkRefCount();
     this.buf.position(position);
     return this;
   }
 
   @Override
   public SingleByteBuff skip(int len) {
+    checkRefCount();
     this.buf.position(this.buf.position() + len);
     return this;
   }
 
   @Override
   public SingleByteBuff moveBack(int len) {
+    checkRefCount();
     this.buf.position(this.buf.position() - len);
     return this;
   }
 
   @Override
   public int capacity() {
+    checkRefCount();
     return this.buf.capacity();
   }
 
   @Override
   public int limit() {
+    checkRefCount();
     return this.buf.limit();
   }
 
   @Override
   public SingleByteBuff limit(int limit) {
+    checkRefCount();
     this.buf.limit(limit);
     return this;
   }
 
   @Override
   public SingleByteBuff rewind() {
+    checkRefCount();
     this.buf.rewind();
     return this;
   }
 
   @Override
   public SingleByteBuff mark() {
+    checkRefCount();
     this.buf.mark();
     return this;
   }
 
   @Override
   public ByteBuffer asSubByteBuffer(int length) {
+    checkRefCount();
     // Just return the single BB that is available
     return this.buf;
   }
 
   @Override
   public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+    checkRefCount();
     // Just return the single BB that is available
     pair.setFirst(this.buf);
     pair.setSecond(offset);
@@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int remaining() {
+    checkRefCount();
     return this.buf.remaining();
   }
 
   @Override
   public boolean hasRemaining() {
+    checkRefCount();
     return buf.hasRemaining();
   }
 
   @Override
   public SingleByteBuff reset() {
+    checkRefCount();
     this.buf.reset();
     return this;
   }
 
   @Override
   public SingleByteBuff slice() {
-    return new SingleByteBuff(this.buf.slice());
+    checkRefCount();
+    return new SingleByteBuff(this.refCnt, this.buf.slice());
   }
 
   @Override
   public SingleByteBuff duplicate() {
-    return new SingleByteBuff(this.buf.duplicate());
+    checkRefCount();
+    return new SingleByteBuff(this.refCnt, this.buf.duplicate());
   }
 
   @Override
   public byte get() {
+    checkRefCount();
     return buf.get();
   }
 
   @Override
   public byte get(int index) {
+    checkRefCount();
     if (UNSAFE_AVAIL) {
       return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
     }
@@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public byte getByteAfterPosition(int offset) {
+    checkRefCount();
     return get(this.buf.position() + offset);
   }
 
   @Override
   public SingleByteBuff put(byte b) {
+    checkRefCount();
     this.buf.put(b);
     return this;
   }
 
   @Override
   public SingleByteBuff put(int index, byte b) {
+    checkRefCount();
     buf.put(index, b);
     return this;
   }
 
   @Override
   public void get(byte[] dst, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
     buf.position(buf.position() + length);
   }
 
   @Override
   public void get(int sourceOffset, byte[] dst, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length);
   }
 
@@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    checkRefCount();
     if (src instanceof SingleByteBuff) {
       ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
-          offset, length);
+        offset, length);
     } else {
       // TODO we can do some optimization here? Call to asSubByteBuffer might
       // create a copy.
@@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff {
       src.asSubByteBuffer(srcOffset, length, pair);
       if (pair.getFirst() != null) {
         ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
-            length);
+          length);
       }
     }
     return this;
@@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public SingleByteBuff put(byte[] src, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
     return this;
   }
 
   @Override
   public SingleByteBuff put(byte[] src) {
+    checkRefCount();
     return put(src, 0, src.length);
   }
 
   @Override
   public boolean hasArray() {
+    checkRefCount();
     return this.buf.hasArray();
   }
 
   @Override
   public byte[] array() {
+    checkRefCount();
     return this.buf.array();
   }
 
   @Override
   public int arrayOffset() {
+    checkRefCount();
     return this.buf.arrayOffset();
   }
 
   @Override
   public short getShort() {
+    checkRefCount();
     return this.buf.getShort();
   }
 
   @Override
   public short getShort(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
     }
@@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public short getShortAfterPosition(int offset) {
+    checkRefCount();
     return getShort(this.buf.position() + offset);
   }
 
   @Override
   public int getInt() {
+    checkRefCount();
     return this.buf.getInt();
   }
 
   @Override
   public SingleByteBuff putInt(int value) {
+    checkRefCount();
     ByteBufferUtils.putInt(this.buf, value);
     return this;
   }
 
   @Override
   public int getInt(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
     }
@@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int getIntAfterPosition(int offset) {
+    checkRefCount();
     return getInt(this.buf.position() + offset);
   }
 
   @Override
   public long getLong() {
+    checkRefCount();
     return this.buf.getLong();
   }
 
   @Override
   public SingleByteBuff putLong(long value) {
+    checkRefCount();
     ByteBufferUtils.putLong(this.buf, value);
     return this;
   }
 
   @Override
   public long getLong(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
     }
@@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public long getLongAfterPosition(int offset) {
+    checkRefCount();
     return getLong(this.buf.position() + offset);
   }
 
   @Override
   public byte[] toBytes(int offset, int length) {
+    checkRefCount();
     byte[] output = new byte[length];
     ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
     return output;
@@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public void get(ByteBuffer out, int sourceOffset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
   }
 
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
+    checkRefCount();
     return channelRead(channel, buf);
   }
 
   @Override
+  public ByteBuffer[] nioByteBuffers() {
+    checkRefCount();
+    return new ByteBuffer[] { this.buf };
+  }
+
+  @Override
   public boolean equals(Object obj) {
-    if(!(obj instanceof SingleByteBuff)) return false;
-    return this.buf.equals(((SingleByteBuff)obj).buf);
+    if (!(obj instanceof SingleByteBuff)) {
+      return false;
+    }
+    return this.buf.equals(((SingleByteBuff) obj).buf);
   }
 
   @Override
@@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff {
     return this.buf.hashCode();
   }
 
-  /**
-   * @return the ByteBuffer which this wraps.
-   */
-  @VisibleForTesting
-  public ByteBuffer getEnclosingByteBuffer() {
-    return this.buf;
+  @Override
+  public SingleByteBuff retain() {
+    refCnt.retain();
+    return this;
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2e14b13..d023339 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -27,9 +27,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -311,10 +311,6 @@ public class ByteBufferArray {
       srcIndex += cnt;
     }
     assert srcIndex == len;
-    if (mbb.length > 1) {
-      return new MultiByteBuff(mbb);
-    } else {
-      return new SingleByteBuff(mbb[0]);
-    }
+    return ByteBuffAllocator.wrap(mbb);
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 3ea0a5c..98bc88a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -348,25 +349,39 @@ public final class ByteBufferUtils {
     }
   }
 
-  /**
-   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
-   * {@link ByteBuffer}.
-   */
-  public static long readVLong(ByteBuffer in) {
-    byte firstByte = in.get();
+  private interface ByteVisitor {
+    byte get();
+  }
+
+  private static long readVLong(ByteVisitor visitor) {
+    byte firstByte = visitor.get();
     int len = WritableUtils.decodeVIntSize(firstByte);
     if (len == 1) {
       return firstByte;
     }
     long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
-      byte b = in.get();
+    for (int idx = 0; idx < len - 1; idx++) {
+      byte b = visitor.get();
       i = i << 8;
       i = i | (b & 0xFF);
     }
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
+  /**
+   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}.
+   */
+  public static long readVLong(ByteBuffer in) {
+    return readVLong(in::get);
+  }
+
+  /**
+   * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
+   * {@link ByteBuff}.
+   */
+  public static long readVLong(ByteBuff in) {
+    return readVLong(in::get);
+  }
 
   /**
    * Put in buffer integer using 7 bit encoding. For each written byte:
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
new file mode 100644
index 0000000..0976c11
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestByteBuffAllocator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
+
+  @Test
+  public void testAllocateByteBuffToReadInto() {
+    int maxBuffersInPool = 10;
+    int bufSize = 6 * 1024;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6);
+    ByteBuff buff = alloc.allocate(10 * bufSize);
+    buff.release();
+    // When the request size is less than 1/6th of the pool buffer size. We should use on demand
+    // created on heap Buffer
+    buff = alloc.allocate(200);
+    assertTrue(buff.hasArray());
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+    buff.release();
+    // When the request size is > 1/6th of the pool buffer size.
+    buff = alloc.allocate(1024);
+    assertFalse(buff.hasArray());
+    assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+    buff.release();// ByteBuffDeallocaor#free should put back the BB to pool.
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+    // Request size> pool buffer size
+    buff = alloc.allocate(7 * 1024);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    ByteBuffer[] bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertTrue(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(1024, bbs[1].limit());
+    assertEquals(maxBuffersInPool - 2, alloc.getQueueSize());
+    buff.release();
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+    buff = alloc.allocate(6 * 1024 + 200);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertFalse(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(200, bbs[1].limit());
+    assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+    buff.release();
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+    alloc.allocate(bufSize * (maxBuffersInPool - 1));
+    buff = alloc.allocate(20 * 1024);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertFalse(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(14 * 1024, bbs[1].limit());
+    assertEquals(0, alloc.getQueueSize());
+    buff.release();
+    assertEquals(1, alloc.getQueueSize());
+    alloc.allocateOneBuffer();
+
+    buff = alloc.allocate(7 * 1024);
+    assertTrue(buff.hasArray());
+    assertTrue(buff instanceof SingleByteBuff);
+    assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit());
+    buff.release();
+  }
+
+  @Test
+  public void testNegativeAllocatedSize() {
+    int maxBuffersInPool = 10;
+    ByteBuffAllocator allocator =
+        new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024);
+    try {
+      allocator.allocate(-1);
+      fail("Should throw exception when size < 0");
+    } catch (IllegalArgumentException e) {
+      // expected exception
+    }
+    ByteBuff bb = allocator.allocate(0);
+    bb.release();
+  }
+
+  @Test
+  public void testAllocateOneBuffer() {
+    // Allocate from on-heap
+    ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
+    ByteBuff buf = allocator.allocateOneBuffer();
+    assertTrue(buf.hasArray());
+    assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
+    buf.release();
+
+    // Allocate from off-heap
+    int bufSize = 10;
+    allocator = new ByteBuffAllocator(true, 1, 10, 3);
+    buf = allocator.allocateOneBuffer();
+    assertFalse(buf.hasArray());
+    assertEquals(buf.remaining(), bufSize);
+    // The another one will be allocated from on-heap because the pool has only one ByteBuffer,
+    // and still not be cleaned.
+    ByteBuff buf2 = allocator.allocateOneBuffer();
+    assertTrue(buf2.hasArray());
+    assertEquals(buf2.remaining(), bufSize);
+    // free the first one
+    buf.release();
+    // The next one will be off-heap again.
+    buf = allocator.allocateOneBuffer();
+    assertFalse(buf.hasArray());
+    assertEquals(buf.remaining(), bufSize);
+    buf.release();
+  }
+
+  @Test
+  public void testReferenceCount() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3);
+    ByteBuff buf1 = alloc.allocate(bufSize * 2);
+    assertFalse(buf1.hasArray());
+    // The next one will be allocated from heap
+    ByteBuff buf2 = alloc.allocateOneBuffer();
+    assertTrue(buf2.hasArray());
+
+    // duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer)
+    ByteBuff dup2 = buf2.duplicate();
+    dup2.release();
+    assertEquals(0, buf2.refCnt());
+    assertEquals(0, dup2.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    assertException(dup2::position);
+    assertException(buf2::position);
+
+    // duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer)
+    ByteBuff dup1 = buf1.duplicate();
+    dup1.release();
+    assertEquals(0, buf1.refCnt());
+    assertEquals(0, dup1.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+    assertException(dup1::position);
+    assertException(buf1::position);
+
+    // slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer)
+    ByteBuff buf3 = alloc.allocateOneBuffer();
+    assertFalse(buf3.hasArray());
+    ByteBuff slice3 = buf3.slice();
+    slice3.release();
+    assertEquals(0, buf3.refCnt());
+    assertEquals(0, slice3.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer)
+    ByteBuff buf4 = alloc.allocate(bufSize * 2);
+    assertFalse(buf4.hasArray());
+    ByteBuff slice4 = buf4.slice();
+    slice4.release();
+    assertEquals(0, buf4.refCnt());
+    assertEquals(0, slice4.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test multiple reference for the same ByteBuff (SingleByteBuff)
+    ByteBuff buf5 = alloc.allocateOneBuffer();
+    ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice();
+    slice5.release();
+    assertEquals(0, buf5.refCnt());
+    assertEquals(0, slice5.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+    assertException(slice5::position);
+    assertException(buf5::position);
+
+    // Test multiple reference for the same ByteBuff (SingleByteBuff)
+    ByteBuff buf6 = alloc.allocate(bufSize >> 2);
+    ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice();
+    slice6.release();
+    assertEquals(0, buf6.refCnt());
+    assertEquals(0, slice6.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain the parent SingleByteBuff (duplicate)
+    ByteBuff parent = alloc.allocateOneBuffer();
+    ByteBuff child = parent.duplicate();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain parent MultiByteBuff (duplicate)
+    parent = alloc.allocate(bufSize << 1);
+    child = parent.duplicate();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain the parent SingleByteBuff (slice)
+    parent = alloc.allocateOneBuffer();
+    child = parent.slice();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain parent MultiByteBuff (slice)
+    parent = alloc.allocate(bufSize << 1);
+    child = parent.slice();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+  }
+
+  @Test
+  public void testReverseRef() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+    ByteBuff buf1 = alloc.allocate(bufSize);
+    ByteBuff dup1 = buf1.duplicate();
+    assertEquals(1, buf1.refCnt());
+    assertEquals(1, dup1.refCnt());
+    buf1.release();
+    assertEquals(0, buf1.refCnt());
+    assertEquals(0, dup1.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    assertException(buf1::position);
+    assertException(dup1::position);
+  }
+
+  @Test
+  public void testByteBuffUnsupportedMethods() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+    ByteBuff buf = alloc.allocate(bufSize);
+    assertException(() -> buf.retain(2));
+    assertException(() -> buf.release(2));
+    assertException(() -> buf.touch());
+    assertException(() -> buf.touch(new Object()));
+  }
+
+  private void assertException(Runnable r) {
+    try {
+      r.run();
+      fail();
+    } catch (Exception e) {
+      // expected exception.
+    }
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
index 2f7a869..3ac7a75 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream {
 
   @Test
   public void testWrites() throws Exception {
-    ByteBufferPool pool = new ByteBufferPool(10, 3);
-    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6);
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc);
     bbos.write(2);// Write a byte
     bbos.writeInt(100);// Write an int
     byte[] b = Bytes.toBytes("row123");// 6 bytes
     bbos.write(b);
+    assertEquals(2, bbos.allBufs.size());
     // Just use the 3rd BB from pool so that pabos, on request, wont get one
-    ByteBuffer bb1 = pool.getBuffer();
+    ByteBuff bb1 = alloc.allocateOneBuffer();
     ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
     bbos.write(bb, 0, bb.capacity());
-    pool.putbackBuffer(bb1);
+    bb1.release();
     bbos.writeInt(123);
     bbos.writeInt(124);
-    assertEquals(0, pool.getQueueSize());
+    assertEquals(0, alloc.getQueueSize());
     List<ByteBuffer> allBufs = bbos.getByteBuffers();
     assertEquals(4, allBufs.size());
-    assertEquals(3, bbos.bufsFromPool.size());
+    assertEquals(4, bbos.allBufs.size());
     ByteBuffer b1 = allBufs.get(0);
     assertEquals(10, b1.remaining());
     assertEquals(2, b1.get());
     assertEquals(100, b1.getInt());
     byte[] bActual = new byte[b.length];
-    b1.get(bActual, 0, 5);//5 bytes in 1st BB
+    b1.get(bActual, 0, 5);// 5 bytes in 1st BB
     ByteBuffer b2 = allBufs.get(1);
     assertEquals(10, b2.remaining());
     b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
@@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream {
     assertEquals(4, b4.remaining());
     assertEquals(124, b4.getInt());
     bbos.releaseResources();
-    assertEquals(3, pool.getQueueSize());
+    assertEquals(3, alloc.getQueueSize());
   }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
deleted file mode 100644
index 44d2f45..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ IOTests.class, SmallTests.class })
-public class TestByteBufferPool {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestByteBufferPool.class);
-
-  @Test
-  public void testOffheapBBPool() throws Exception {
-    boolean directByteBuffer = true;
-    testBBPool(10, 100, directByteBuffer);
-  }
-
-  @Test
-  public void testOnheapBBPool() throws Exception {
-    boolean directByteBuffer = false;
-    testBBPool(10, 100, directByteBuffer);
-  }
-
-  private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
-    ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
-    for (int i = 0; i < maxPoolSize; i++) {
-      ByteBuffer buffer = pool.getBuffer();
-      assertEquals(0, buffer.position());
-      assertEquals(bufferSize, buffer.limit());
-      assertEquals(directByteBuffer, buffer.isDirect());
-    }
-    assertEquals(0, pool.getQueueSize());
-    ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
-        : ByteBuffer.allocateDirect(bufferSize);
-    pool.putbackBuffer(bb);
-    assertEquals(0, pool.getQueueSize());
-    bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
-        : ByteBuffer.allocate(bufferSize + 1);
-    pool.putbackBuffer(bb);
-    assertEquals(0, pool.getQueueSize());
-  }
-}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
index 84cf7a4..fcfb77a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
@@ -286,12 +286,12 @@ public class TestMultiByteBuff {
     multi.putInt(45);
     multi.position(1);
     multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
-    MultiByteBuff sliced = multi.slice();
+    ByteBuff sliced = multi.slice();
     assertEquals(0, sliced.position());
     assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
     assertEquals(l1, sliced.getLong());
     assertEquals(l2, sliced.getLong());
-    MultiByteBuff dup = multi.duplicate();
+    ByteBuff dup = multi.duplicate();
     assertEquals(1, dup.position());
     assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
     assertEquals(l1, dup.getLong());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
index 0224dea..a842967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
@@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize {
 
   /**
    * SHARED means when this Cacheable is read back from cache it refers to the same memory area as
-   * used by the cache for caching it.
-   * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
-   * exclusive memory area of this Cacheable.
+   * used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
+   * the data was copied to an exclusive memory area of this Cacheable.
    */
-  public static enum MemoryType {
+  enum MemoryType {
     SHARED, EXCLUSIVE
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 80b1288..5ed3d2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
     NettyServerCall reqTooBig =
       new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
         connection, 0, connection.addr, System.currentTimeMillis(), 0,
-        connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null);
+        connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
 
     connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 742a728..bba1bed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer {
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
       long startTime, int timeout) throws IOException {
     NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
-        -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
+        -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
     return call(fakeCall, status);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index 2fae311..8dc08c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
 
   NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
       Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
-      InetAddress remoteAddress, long receiveTime, int timeout,
-      ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
-    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
-        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+      InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+        timeout, bbAllocator, cellBlockBuilder, reqCleanup);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
index ffa16bf..2f97f53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
 
   void process(final ByteBuf buf) throws IOException, InterruptedException {
     if (connectionHeaderRead) {
-      this.callCleanup = new RpcServer.CallCleanup() {
-        @Override
-        public void run() {
-          buf.release();
-        }
-      };
+      this.callCleanup = buf::release;
       process(new SingleByteBuff(buf.nioBuffer()));
     } else {
       ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
@@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
       long size, final InetAddress remoteAddress, int timeout,
       CallCleanup reqCleanup) {
     return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
-        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
         this.rpcServer.cellBlockBuilder, reqCleanup);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3ab63dd..ac8c26c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface,
 
   protected UserProvider userProvider;
 
-  protected final ByteBufferPool reservoir;
-  // The requests and response will use buffers from ByteBufferPool, when the size of the
-  // request/response is at least this size.
-  // We make this to be 1/6th of the pool buffer size.
-  protected final int minSizeForReservoirUse;
+  protected final ByteBuffAllocator bbAllocator;
 
   protected volatile boolean allowFallbackToSimpleAuth;
 
@@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface,
   private RSRpcServices rsRpcServices;
 
   @FunctionalInterface
-  protected static interface CallCleanup {
+  protected interface CallCleanup {
     void run();
   }
 
@@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface,
       final List<BlockingServiceAndInterface> services,
       final InetSocketAddress bindAddress, Configuration conf,
       RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
-    if (reservoirEnabled) {
-      int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
-          ByteBufferPool.DEFAULT_BUFFER_SIZE);
-      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
-      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
-      // at which we will send back one RPC request. Means max we need 2 MB for creating the
-      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
-      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
-      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
-      // is by default 64 KB.
-      // In case of read request, at the end of the handler process, we will make the response
-      // cellblock and add the Call to connection's response Q and a single Responder thread takes
-      // connections and responses from that one by one and do the socket write. So there is chances
-      // that by the time a handler originated response is actually done writing to socket and so
-      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
-      // we consider and consider that also for the max buffers to pool
-      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
-      int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
-          conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-              HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
-      this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
-      this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
-    } else {
-      reservoir = null;
-      this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
-    }
+    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
     this.server = server;
     this.services = services;
     this.bindAddress = bindAddress;
@@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface,
     this.scheduler = scheduler;
   }
 
-  @VisibleForTesting
-  static int getMinSizeForReservoirUse(ByteBufferPool pool) {
-    return pool.getBufferSize() / 6;
-  }
-
   @Override
   public void onConfigurationChange(Configuration newConf) {
     initReconfigurable(newConf);
@@ -652,55 +613,6 @@ public abstract class RpcServer implements RpcServerInterface,
   }
 
   /**
-   * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
-   * as much as possible.
-   *
-   * @param pool The ByteBufferPool to use
-   * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
-   *           need of size below this, create on heap ByteBuffer.
-   * @param reqLen Bytes count in request
-   */
-  @VisibleForTesting
-  static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
-      int minSizeForPoolUse, int reqLen) {
-    ByteBuff resultBuf;
-    List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
-    int remain = reqLen;
-    ByteBuffer buf = null;
-    while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
-      bbs.add(buf);
-      remain -= pool.getBufferSize();
-    }
-    ByteBuffer[] bufsFromPool = null;
-    if (bbs.size() > 0) {
-      bufsFromPool = new ByteBuffer[bbs.size()];
-      bbs.toArray(bufsFromPool);
-    }
-    if (remain > 0) {
-      bbs.add(ByteBuffer.allocate(remain));
-    }
-    if (bbs.size() > 1) {
-      ByteBuffer[] items = new ByteBuffer[bbs.size()];
-      bbs.toArray(items);
-      resultBuf = new MultiByteBuff(items);
-    } else {
-      // We are backed by single BB
-      resultBuf = new SingleByteBuff(bbs.get(0));
-    }
-    resultBuf.limit(reqLen);
-    if (bufsFromPool != null) {
-      final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
-      return new Pair<>(resultBuf, () -> {
-        // Return back all the BBs to pool
-        for (int i = 0; i < bufsFromPoolFinal.length; i++) {
-          pool.putbackBuffer(bufsFromPoolFinal[i]);
-        }
-      });
-    }
-    return new Pair<>(resultBuf, null);
-  }
-
-  /**
    * Needed for features such as delayed calls.  We need to be able to store the current call
    * so that we can complete it later or ask questions of what is supported by the current ongoing
    * call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index cf1cf9a..f93f3a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -26,10 +26,10 @@ import java.util.Optional;
 
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
   protected long startTime;
   protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
 
-  protected final ByteBufferPool reservoir;
+  protected final ByteBuffAllocator bbAllocator;
 
   protected final CellBlockBuilder cellBlockBuilder;
 
@@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
   private long exceptionSize = 0;
   private final boolean retryImmediatelySupported;
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-      justification="Can't figure why this complaint is happening... see below")
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+      justification = "Can't figure why this complaint is happening... see below")
   ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
-      Message param, CellScanner cellScanner, T connection, long size,
-      InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+      Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
+      long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
       CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
     this.id = id;
     this.service = service;
@@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
     this.remoteAddress = remoteAddress;
     this.timeout = timeout;
     this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
-    this.reservoir = reservoir;
+    this.bbAllocator = byteBuffAllocator;
     this.cellBlockBuilder = cellBlockBuilder;
     this.reqCleanup = reqCleanup;
   }
@@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
       // high when we can avoid a big buffer allocation on each rpc.
       List<ByteBuffer> cellBlock = null;
       int cellBlockSize = 0;
-      if (this.reservoir != null) {
+      if (bbAllocator.isReservoirEnabled()) {
         this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
-          this.connection.compressionCodec, cells, this.reservoir);
+          this.connection.compressionCodec, cells, bbAllocator);
         if (this.cellBlockStream != null) {
           cellBlock = this.cellBlockStream.getByteBuffers();
           cellBlockSize = this.cellBlockStream.size();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index 2a8cfbe..f3f7807 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer {
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
       long startTime, int timeout) throws IOException {
     SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
-        null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
+        null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
     return call(fakeCall, status);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index 6084138..311b4c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
       justification = "Can't figure why this complaint is happening... see below")
   SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
       RequestHeader header, Message param, CellScanner cellScanner,
-      SimpleServerRpcConnection connection, long size,
-      final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
-      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
-    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
-        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+      SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
+      long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
+      SimpleRpcServerResponder responder) {
+    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+        timeout, bbAllocator, cellBlockBuilder, reqCleanup);
     this.responder = responder;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index b4b5f33..01127cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Pair;
 
 /** Reads calls from a connection and queues them for handling. */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
@@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
           // Notify the client about the offending request
           SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
               null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
-              this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
+              this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
           this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
           // Make sure the client recognizes the underlying exception
           // Otherwise, throw a DoNotRetryIOException.
@@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
 
   // It creates the ByteBuff and CallCleanup and assign to Connection instance.
   private void initByteBuffToReadInto(int length) {
-    // We create random on heap buffers are read into those when
-    // 1. ByteBufferPool is not there.
-    // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
-    // waste then. Also if all the reqs are of this size, we will be creating larger sized
-    // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
-    // RegionOpen.
-    // 3. If it is an initial handshake signal or initial connection request. Any way then
-    // condition 2 itself will match
-    // 4. When SASL use is ON.
-    if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
-        useSasl || length < this.rpcServer.minSizeForReservoirUse) {
-      this.data = new SingleByteBuff(ByteBuffer.allocate(length));
-    } else {
-      Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
-        this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
-      this.data = pair.getFirst();
-      this.callCleanup = pair.getSecond();
-    }
+    this.data = rpcServer.bbAllocator.allocate(length);
+    this.callCleanup = data::release;
   }
 
   protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
@@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
       RequestHeader header, Message param, CellScanner cellScanner, long size,
       InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
     return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
-        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
         this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 75800ba..8a993b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
 import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
 import static org.junit.Assert.assertEquals;
 
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -94,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded {
   protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
     TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
     TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
-    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+    TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
     TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
       String.valueOf(memoryCompaction));
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
index 267e9e8..abf20dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.NettyRpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
@@ -71,9 +72,8 @@ public class TestServerLoadDurability {
 
   private static Configuration createConfigurationForSimpleRpcServer() {
     Configuration conf = HBaseConfiguration.create();
-    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
-        SimpleRpcServer.class.getName());
-    conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20);
+    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
+    conf.setInt(BUFFER_SIZE_KEY, 20);
     return conf;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 48080b2..32160a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -832,7 +832,7 @@ public class TestHFileBlock {
     if (ClassSize.is32BitJVM()) {
       assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
     } else {
-      assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
+      assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
     }
 
     for (int size : new int[] { 100, 256, 12345 }) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
deleted file mode 100644
index 560190b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RPCTests.class, SmallTests.class })
-public class TestRpcServer {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRpcServer.class);
-
-  @Test
-  public void testAllocateByteBuffToReadInto() throws Exception {
-    int maxBuffersInPool = 10;
-    ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
-    initPoolWithAllBuffers(pool, maxBuffersInPool);
-    ByteBuff buff = null;
-    Pair<ByteBuff, CallCleanup> pair;
-    // When the request size is less than 1/6th of the pool buffer size. We should use on demand
-    // created on heap Buffer
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        200);
-    buff = pair.getFirst();
-    assertTrue(buff.hasArray());
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-    assertNull(pair.getSecond());
-    // When the request size is > 1/6th of the pool buffer size.
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-    // Request size> pool buffer size
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        7 * 1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertTrue(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(1024, bbs[1].limit());
-    assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        6 * 1024 + 200);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertFalse(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(200, bbs[1].limit());
-    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-
-    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
-    for (int i = 0; i < maxBuffersInPool - 1; i++) {
-      buffers[i] = pool.getBuffer();
-    }
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        20 * 1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertFalse(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(14 * 1024, bbs[1].limit());
-    assertEquals(0, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(1, pool.getQueueSize());
-    pool.getBuffer();
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        7 * 1024);
-    buff = pair.getFirst();
-    assertTrue(buff.hasArray());
-    assertTrue(buff instanceof SingleByteBuff);
-    assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
-    assertNull(pair.getSecond());
-  }
-
-  private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
-    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
-    // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
-    // all. Makes pool with max #buffers.
-    for (int i = 0; i < maxBuffersInPool; i++) {
-      buffers[i] = pool.getBuffer();
-    }
-    for (ByteBuffer buf : buffers) {
-      pool.putbackBuffer(buf);
-    }
-  }
-}