You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/03/01 03:25:19 UTC

[hbase] branch HBASE-21879 updated (f0032c9 -> 587788f)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a change to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from f0032c9  HBASE-20587 Replace Jackson with shaded thirdparty gson
     add 9af8d58  HBASE-21667 Move to latest ASF Parent POM
     add d152e94  Revert "HBASE-17094 Add a sitemap for hbase.apache.org"
     add f382237  HBASE-21730 Update HBase-book with the procedure based WAL splitting
     add 07d84eb  HBASE-21929 The checks at the end of TestRpcClientLeaks are not executed
     add 0fc5173  HBASE-21922 BloomContext#sanityCheck may failed when use ROWPREFIX_DELIMITED bloom filter
     add 449ed08  HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for async client
     add a47c1dd  HBASE-21945 Maintain the original order when sending batch request
     add b73f03e  Revert "HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for async client"
     add aa7d3ce  HBASE-21944 Validate put for batch operation
     add 18d7114  HBASE-21057 upgrade to latest spotbugs
     add b57c03b  HBASE-21947 TestShell is broken after we remove the jackson dependencies
     add 99362e8  HBASE-21942 [UI] requests per second is incorrect in rsgroup page(rsgroup.jsp)
     add c33ceb2  HBASE-21943 The usage of RegionLocations.mergeRegionLocations is wrong for async client
     add 18f0745  HBASE-21960 Ensure RESTServletContainer used by RESTServer
     add 6e06a0d  HBASE-20724 Sometimes some compacted storefiles are still opened after region failover
     add 8408e26  HBASE-21962 Filters do not work in ThriftTable
     add b1c42f10 HBASE-21961 Infinite loop in AsyncNonMetaRegionLocator if there is only one region and we tried to locate before a non empty row
     add 9370347  HBASE-21820 Implement CLUSTER quota scope
     add e65744a  HBASE-21450 [documentation] Point spark doc at hbase-connectors spark
     add c4f5d3c  HBASE-21967 Split TestServerCrashProcedure and TestServerCrashProcedureWithReplicas
     add c19bc59  HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls
     new 587788f  HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |   7 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 103 +++--
 .../hbase/client/AsyncRegionLocatorHelper.java     |  14 -
 .../hadoop/hbase/client/ConnectionUtils.java       |   3 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |   5 +-
 .../apache/hadoop/hbase/ipc/CellBlockBuilder.java  |   9 +-
 .../hadoop/hbase/quotas/QuotaSettingsFactory.java  | 100 ++++-
 .../hadoop/hbase/regionserver/BloomType.java       |   6 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  25 ++
 hbase-common/pom.xml                               |   4 +
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  | 282 ++++++++++++
 .../hbase/io/ByteBufferListOutputStream.java       |  40 +-
 .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 155 -------
 .../hbase/io/encoding/CopyKeyDataBlockEncoder.java |   2 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java     |   2 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java    |   2 +-
 .../hbase/io/encoding/PrefixKeyDeltaEncoder.java   |   2 +-
 .../hadoop/hbase/io/encoding/RowIndexSeekerV1.java |   2 +-
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 146 +++----
 .../org/apache/hadoop/hbase/nio/MultiByteBuff.java |  98 ++++-
 .../java/org/apache/hadoop/hbase/nio/RefCnt.java   |  39 +-
 .../apache/hadoop/hbase/nio/SingleByteBuff.java    |  92 +++-
 .../apache/hadoop/hbase/util/ByteBufferArray.java  |  10 +-
 .../apache/hadoop/hbase/util/ByteBufferUtils.java  |  31 +-
 .../hadoop/hbase/io/TestByteBuffAllocator.java     | 309 +++++++++++++
 .../hbase/io/TestByteBufferListOutputStream.java   |  18 +-
 .../apache/hadoop/hbase/io/TestByteBufferPool.java |  67 ---
 .../apache/hadoop/hbase/nio/TestMultiByteBuff.java |   4 +-
 .../chaos/actions/ChangeBloomFilterAction.java     |   2 -
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java |   4 -
 .../org/apache/hadoop/hbase/util/LoadTestTool.java |   8 -
 .../procedure2/RemoteProcedureDispatcher.java      |  29 ++
 .../src/main/protobuf/HFile.proto                  |   4 +
 hbase-rest/pom.xml                                 |  26 ++
 .../org/apache/hadoop/hbase/rest/RESTServer.java   | 158 ++++---
 .../hadoop/hbase/rest/RESTServletContainer.java    |  13 +-
 .../apache/hadoop/hbase/rest/SchemaResource.java   |   6 +
 .../hadoop/hbase/rest/filter/AuthFilter.java       |  10 +-
 .../hadoop/hbase/rest/HBaseRESTTestingUtility.java |  71 +--
 .../hadoop/hbase/rest/TestMultiRowResource.java    |   3 +
 .../hadoop/hbase/rest/TestSchemaResource.java      |   3 +
 .../hadoop/hbase/rest/TestSecureRESTServer.java    | 428 ++++++++++++++++++
 hbase-rest/src/test/resources/log4j.properties     |   1 +
 .../apache/hadoop/hbase/io/hfile/Cacheable.java    |   7 +-
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |   2 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |   2 +-
 .../apache/hadoop/hbase/ipc/NettyServerCall.java   |  12 +-
 .../hadoop/hbase/ipc/NettyServerRpcConnection.java |   9 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  96 +----
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  18 +-
 .../apache/hadoop/hbase/ipc/SimpleRpcServer.java   |   2 +-
 .../apache/hadoop/hbase/ipc/SimpleServerCall.java  |  15 +-
 .../hbase/ipc/SimpleServerRpcConnection.java       |  26 +-
 .../assignment/RegionRemoteProcedureBase.java      |   5 +
 .../assignment/RegionTransitionProcedure.java      |   1 +
 .../master/procedure/ServerRemoteProcedure.java    | 131 ++++++
 .../master/procedure/SplitWALRemoteProcedure.java  |  84 +---
 .../SwitchRpcThrottleRemoteProcedure.java          |  61 +--
 .../master/replication/RefreshPeerProcedure.java   |  71 +--
 .../SyncReplicationReplayWALRemoteProcedure.java   |  70 +--
 .../org/apache/hadoop/hbase/quotas/QuotaCache.java |  57 ++-
 .../org/apache/hadoop/hbase/quotas/QuotaUtil.java  |  84 +++-
 .../regionserver/AbstractMultiFileWriter.java      |  13 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  64 ++-
 .../hadoop/hbase/regionserver/HStoreFile.java      |  59 ++-
 .../hadoop/hbase/regionserver/StoreFileReader.java |  76 +---
 .../hadoop/hbase/regionserver/StoreFileWriter.java | 123 ++++--
 .../compactions/DateTieredCompactor.java           |   2 +-
 .../regionserver/compactions/DefaultCompactor.java |   2 +-
 .../regionserver/compactions/StripeCompactor.java  |   2 +-
 .../apache/hadoop/hbase/util/BloomFilterUtil.java  |  54 +--
 .../hbase/util/RowPrefixDelimiterBloomContext.java |  62 ---
 .../resources/hbase-webapps/master/rsgroup.jsp     |   6 +-
 .../client/TestAsyncNonMetaRegionLocator.java      |  11 +
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 185 ++++----
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  53 ++-
 .../client/TestAsyncTableGetMultiThreaded.java     |   4 +-
 .../TestAsyncTableLocateRegionForDeletedTable.java | 105 +++++
 .../hbase/client/TestServerLoadDurability.java     |   8 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      |   2 +-
 .../io/hfile/TestSeekBeforeWithInlineBlocks.java   |   2 -
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java       |  41 +-
 .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 144 -------
 ...{TestServerCrashProcedure.java => TestSCP.java} |  43 +-
 ...eWithReplicas.java => TestSCPWithReplicas.java} |   8 +-
 .../TestSCPWithReplicasWithoutZKCoordinated.java}  |  13 +-
 .../TestSCPWithoutZKCoordinated.java}              |  15 +-
 .../procedure/TestServerRemoteProcedure.java       | 282 ++++++++++++
 .../quotas/TestClusterScopeQuotaThrottle.java      | 236 ++++++++++
 .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java |  63 ++-
 .../hadoop/hbase/quotas/TestQuotaThrottle.java     | 478 ++++++++-------------
 .../quotas/TestSnapshotQuotaObserverChore.java     |  10 +-
 .../hbase/quotas/TestSpaceQuotasWithSnapshots.java |   3 +-
 .../hadoop/hbase/quotas/ThrottleQuotaTestUtil.java | 207 +++++++++
 .../hbase/regionserver/CreateRandomStoreFile.java  |   9 -
 .../TestCleanupCompactedFileAfterFailover.java     | 197 +++++++++
 .../TestCleanupCompactedFileOnRegionClose.java     |  60 ---
 .../hbase/regionserver/TestMultiColumnScanner.java |   1 -
 .../regionserver/TestRowPrefixBloomFilter.java     | 377 ++++++++--------
 .../hbase/regionserver/TestScanWithBloomError.java |   1 -
 .../hbase/regionserver/TestSeekOptimizations.java  |   2 -
 .../hbase/regionserver/TestSwitchToStreamRead.java |  49 ---
 .../regionserver/compactions/TestCompactor.java    |   8 +
 hbase-shell/src/main/ruby/hbase/quotas.rb          |  26 +-
 hbase-shell/src/main/ruby/hbase/taskmonitor.rb     |  32 +-
 .../src/main/ruby/shell/commands/set_quota.rb      |  27 +-
 hbase-shell/src/test/ruby/hbase/quotas_test.rb     |  17 +
 .../hbase/thrift/generated/AlreadyExists.java      |   2 +-
 .../hbase/thrift/generated/BatchMutation.java      |   2 +-
 .../hbase/thrift/generated/ColumnDescriptor.java   |   2 +-
 .../hadoop/hbase/thrift/generated/Hbase.java       |   2 +-
 .../hadoop/hbase/thrift/generated/IOError.java     |   2 +-
 .../hbase/thrift/generated/IllegalArgument.java    |   2 +-
 .../hadoop/hbase/thrift/generated/Mutation.java    |   2 +-
 .../hadoop/hbase/thrift/generated/TAppend.java     |   2 +-
 .../hadoop/hbase/thrift/generated/TCell.java       |   2 +-
 .../hadoop/hbase/thrift/generated/TColumn.java     |   2 +-
 .../hadoop/hbase/thrift/generated/TIncrement.java  |   2 +-
 .../hadoop/hbase/thrift/generated/TRegionInfo.java |   2 +-
 .../hadoop/hbase/thrift/generated/TRowResult.java  |   2 +-
 .../hadoop/hbase/thrift/generated/TScan.java       |   2 +-
 .../hadoop/hbase/thrift2/ThriftUtilities.java      |  35 +-
 .../hadoop/hbase/thrift2/generated/TAppend.java    |   2 +-
 .../hbase/thrift2/generated/TAuthorization.java    |   2 +-
 .../hbase/thrift2/generated/TBloomFilterType.java  |  10 +-
 .../hbase/thrift2/generated/TCellVisibility.java   |   2 +-
 .../hadoop/hbase/thrift2/generated/TColumn.java    |   2 +-
 .../thrift2/generated/TColumnFamilyDescriptor.java |   2 +-
 .../hbase/thrift2/generated/TColumnIncrement.java  |   2 +-
 .../hbase/thrift2/generated/TColumnValue.java      |   2 +-
 .../hadoop/hbase/thrift2/generated/TCompareOp.java |   2 +-
 .../thrift2/generated/TCompressionAlgorithm.java   |   2 +-
 .../hbase/thrift2/generated/TConsistency.java      |   2 +-
 .../thrift2/generated/TDataBlockEncoding.java      |   2 +-
 .../hadoop/hbase/thrift2/generated/TDelete.java    |   2 +-
 .../hbase/thrift2/generated/TDeleteType.java       |   2 +-
 .../hbase/thrift2/generated/TDurability.java       |   2 +-
 .../hadoop/hbase/thrift2/generated/TGet.java       |   2 +-
 .../hbase/thrift2/generated/THBaseService.java     |   2 +-
 .../hbase/thrift2/generated/THRegionInfo.java      |   2 +-
 .../hbase/thrift2/generated/THRegionLocation.java  |   2 +-
 .../hadoop/hbase/thrift2/generated/TIOError.java   |   2 +-
 .../hbase/thrift2/generated/TIllegalArgument.java  |   2 +-
 .../hadoop/hbase/thrift2/generated/TIncrement.java |   2 +-
 .../hbase/thrift2/generated/TKeepDeletedCells.java |   2 +-
 .../hadoop/hbase/thrift2/generated/TMutation.java  |   2 +-
 .../thrift2/generated/TNamespaceDescriptor.java    |   2 +-
 .../hadoop/hbase/thrift2/generated/TPut.java       |   2 +-
 .../hadoop/hbase/thrift2/generated/TReadType.java  |   2 +-
 .../hadoop/hbase/thrift2/generated/TResult.java    |   2 +-
 .../hbase/thrift2/generated/TRowMutations.java     |   2 +-
 .../hadoop/hbase/thrift2/generated/TScan.java      |   2 +-
 .../hbase/thrift2/generated/TServerName.java       |   2 +-
 .../hbase/thrift2/generated/TTableDescriptor.java  |   2 +-
 .../hadoop/hbase/thrift2/generated/TTableName.java |   2 +-
 .../hadoop/hbase/thrift2/generated/TTimeRange.java |   2 +-
 .../org/apache/hadoop/hbase/thrift2/hbase.thrift   |   4 -
 .../hadoop/hbase/thrift2/TestThriftConnection.java |   5 +-
 pom.xml                                            | 110 +----
 src/main/asciidoc/_chapters/architecture.adoc      | 155 ++-----
 src/main/asciidoc/_chapters/spark.adoc             |  57 +--
 src/main/asciidoc/book.adoc                        |   2 +
 src/site/custom/project-info-report.properties     |  24 --
 src/site/resources/images/WAL_splitting.png        | Bin 0 -> 38049 bytes
 src/site/site.xml                                  |  20 +-
 165 files changed, 4102 insertions(+), 2692 deletions(-)
 create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
 delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
 copy hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCachesIterator.java => hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java (56%)
 create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
 delete mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
 create mode 100644 hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java
 create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
 delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocateRegionForDeletedTable.java
 delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/{TestServerCrashProcedure.java => TestSCP.java} (86%)
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/{TestServerCrashProcedureWithReplicas.java => TestSCPWithReplicas.java} (92%)
 copy hbase-server/src/test/java/org/apache/hadoop/hbase/master/{TestDLSAsyncFSWAL.java => procedure/TestSCPWithReplicasWithoutZKCoordinated.java} (72%)
 copy hbase-server/src/test/java/org/apache/hadoop/hbase/master/{TestDLSAsyncFSWAL.java => procedure/TestSCPWithoutZKCoordinated.java} (71%)
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestClusterScopeQuotaThrottle.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
 create mode 100644 src/site/resources/images/WAL_splitting.png


[hbase] 01/01: HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool

Posted by op...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 587788f55f58b314ae2847eef129a57980f7427c
Author: huzheng <op...@gmail.com>
AuthorDate: Sat Feb 16 17:16:09 2019 +0800

    HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool
---
 .../apache/hadoop/hbase/ipc/CellBlockBuilder.java  |   9 +-
 hbase-common/pom.xml                               |   4 +
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  | 282 +++++++++++++++++++
 .../hbase/io/ByteBufferListOutputStream.java       |  40 ++-
 .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 155 -----------
 .../hbase/io/encoding/CopyKeyDataBlockEncoder.java |   2 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java     |   2 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java    |   2 +-
 .../hbase/io/encoding/PrefixKeyDeltaEncoder.java   |   2 +-
 .../hadoop/hbase/io/encoding/RowIndexSeekerV1.java |   2 +-
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 146 +++++-----
 .../org/apache/hadoop/hbase/nio/MultiByteBuff.java |  98 +++++--
 .../java/org/apache/hadoop/hbase/nio/RefCnt.java   |  49 ++++
 .../apache/hadoop/hbase/nio/SingleByteBuff.java    |  92 ++++--
 .../apache/hadoop/hbase/util/ByteBufferArray.java  |  10 +-
 .../apache/hadoop/hbase/util/ByteBufferUtils.java  |  31 ++-
 .../hadoop/hbase/io/TestByteBuffAllocator.java     | 309 +++++++++++++++++++++
 .../hbase/io/TestByteBufferListOutputStream.java   |  18 +-
 .../apache/hadoop/hbase/io/TestByteBufferPool.java |  67 -----
 .../apache/hadoop/hbase/nio/TestMultiByteBuff.java |   4 +-
 .../apache/hadoop/hbase/io/hfile/Cacheable.java    |   7 +-
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |   2 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |   2 +-
 .../apache/hadoop/hbase/ipc/NettyServerCall.java   |  12 +-
 .../hadoop/hbase/ipc/NettyServerRpcConnection.java |   9 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  96 +------
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  18 +-
 .../apache/hadoop/hbase/ipc/SimpleRpcServer.java   |   2 +-
 .../apache/hadoop/hbase/ipc/SimpleServerCall.java  |  15 +-
 .../hbase/ipc/SimpleServerRpcConnection.java       |  26 +-
 .../client/TestAsyncTableGetMultiThreaded.java     |   4 +-
 .../hbase/client/TestServerLoadDurability.java     |   8 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      |   2 +-
 .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 144 ----------
 34 files changed, 974 insertions(+), 697 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index 8d68e87..111f768 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -208,7 +208,7 @@ class CellBlockBuilder {
    * @param codec to use for encoding
    * @param compressor to use for encoding
    * @param cellScanner to encode
-   * @param pool Pool of ByteBuffers to make use of.
+   * @param allocator to allocate the {@link ByteBuff}.
    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
    *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
    *         been flipped and is ready for reading. Use limit to find total size. If
@@ -217,15 +217,14 @@ class CellBlockBuilder {
    * @throws IOException if encoding the cells fail
    */
   public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
-      CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+      CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
     if (cellScanner == null) {
       return null;
     }
     if (codec == null) {
       throw new CellScannerButNoCodecException();
     }
-    assert pool != null;
-    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
     encodeCellsTo(bbos, cellScanner, codec, compressor);
     if (bbos.size() == 0) {
       bbos.releaseResources();
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index cdce4a9..670dab2 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -193,6 +193,10 @@
       <artifactId>hbase-shaded-miscellaneous</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-netty</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
new file mode 100644
index 0000000..1833462
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
+ * it provide high-level interfaces for upstream. when allocating desired memory size, it will
+ * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
+ * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
+ * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
+ * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
+ * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
+ * better way is increasing the ByteBufferPool size if we detected this case. <br/>
+ * <br/>
+ * On the other hand, for better memory utilization, we have set an lower bound named
+ * minSizeForReservoirUse in this allocator, and if the desired size is less than
+ * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
+ * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
+ * small objects. <br/>
+ * <br/>
+ * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
+ * read/write path, because it hide the details of memory management and its APIs are more friendly
+ * to the upper layer.
+ */
+@InterfaceAudience.Private
+public class ByteBuffAllocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
+
+  public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
+
+  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
+  // 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel.
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+  public static final String MIN_ALLOCATE_SIZE_KEY =
+      "hbase.ipc.server.reservoir.minimal.allocating.size";
+
+  public static final Recycler NONE = () -> {
+  };
+
+  public interface Recycler {
+    void free();
+  }
+
+  private final boolean reservoirEnabled;
+  private final int bufSize;
+  private final int maxBufCount;
+  private final AtomicInteger usedBufCount = new AtomicInteger(0);
+
+  private boolean maxPoolSizeInfoLevelLogged = false;
+
+  // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
+  // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
+  private final int minSizeForReservoirUse;
+
+  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
+
+  /**
+   * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
+   * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
+   * allocate the insufficient buffers from on-heap to meet the requirement.
+   * @param conf which get the arguments to initialize the allocator.
+   * @param reservoirEnabled indicate whether the reservoir is enabled or disabled.
+   * @return ByteBuffAllocator to manage the byte buffers.
+   */
+  public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
+    int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
+    if (reservoirEnabled) {
+      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
+      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
+      // at which we will send back one RPC request. Means max we need 2 MB for creating the
+      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
+      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
+      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
+      // is by default 64 KB.
+      // In case of read request, at the end of the handler process, we will make the response
+      // cellblock and add the Call to connection's response Q and a single Responder thread takes
+      // connections and responses from that one by one and do the socket write. So there is chances
+      // that by the time a handler originated response is actually done writing to socket and so
+      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
+      // we consider and consider that also for the max buffers to pool
+      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
+      int maxBuffCount =
+          conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+            HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
+      int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
+      return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
+    } else {
+      return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
+   * designed for testing purpose or disabled reservoir case.
+   * @return allocator to allocate on-heap ByteBuffer.
+   */
+  public static ByteBuffAllocator createOnHeap() {
+    return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
+  }
+
+  @VisibleForTesting
+  ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
+      int minSizeForReservoirUse) {
+    this.reservoirEnabled = reservoirEnabled;
+    this.maxBufCount = maxBufCount;
+    this.bufSize = bufSize;
+    this.minSizeForReservoirUse = minSizeForReservoirUse;
+  }
+
+  public boolean isReservoirEnabled() {
+    return reservoirEnabled;
+  }
+
+  @VisibleForTesting
+  public int getQueueSize() {
+    return this.buffers.size();
+  }
+
+  /**
+   * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
+   * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
+   * ByteBuffer pool.
+   * @return an ByteBuff with the buffer size.
+   */
+  public SingleByteBuff allocateOneBuffer() {
+    if (isReservoirEnabled()) {
+      ByteBuffer bb = getBuffer();
+      if (bb != null) {
+        return new SingleByteBuff(() -> putbackBuffer(bb), bb);
+      }
+    }
+    // Allocated from heap, let the JVM free its memory.
+    return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
+  }
+
+  /**
+   * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
+   * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
+   * @param size to allocate
+   * @return an ByteBuff with the desired size.
+   */
+  public ByteBuff allocate(int size) {
+    if (size < 0) {
+      throw new IllegalArgumentException("size to allocate should >=0");
+    }
+    // If disabled the reservoir, just allocate it from on-heap.
+    if (!isReservoirEnabled() || size == 0) {
+      return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
+    }
+    int reminder = size % bufSize;
+    int len = size / bufSize + (reminder > 0 ? 1 : 0);
+    List<ByteBuffer> bbs = new ArrayList<>(len);
+    // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
+    // reservoir is exhausted.
+    int remain = size;
+    while (remain >= minSizeForReservoirUse) {
+      ByteBuffer bb = this.getBuffer();
+      if (bb == null) {
+        break;
+      }
+      bbs.add(bb);
+      remain -= bufSize;
+    }
+    int lenFromReservoir = bbs.size();
+    if (remain > 0) {
+      // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
+      // just allocate the ByteBuffer from on-heap.
+      bbs.add(ByteBuffer.allocate(remain));
+    }
+    ByteBuff bb = wrap(bbs, () -> {
+      for (int i = 0; i < lenFromReservoir; i++) {
+        this.putbackBuffer(bbs.get(i));
+      }
+    });
+    bb.limit(size);
+    return bb;
+  }
+
+  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
+    if (buffers == null || buffers.length == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
+        : new MultiByteBuff(recycler, buffers);
+  }
+
+  public static ByteBuff wrap(ByteBuffer[] buffers) {
+    return wrap(buffers, NONE);
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
+    if (buffers == null || buffers.size() == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
+        : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers) {
+    return wrap(buffers, NONE);
+  }
+
+  /**
+   * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
+   *         the maximum pool size, it will create a new one and return. In case of max pool size
+   *         also reached, will return null. When pool returned a ByteBuffer, make sure to return it
+   *         back to pool after use.
+   */
+  private ByteBuffer getBuffer() {
+    ByteBuffer bb = buffers.poll();
+    if (bb != null) {
+      // To reset the limit to capacity and position to 0, must clear here.
+      bb.clear();
+      return bb;
+    }
+    while (true) {
+      int c = this.usedBufCount.intValue();
+      if (c >= this.maxBufCount) {
+        if (!maxPoolSizeInfoLevelLogged) {
+          LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
+              + "increasing the value for '{}' ?",
+            maxBufCount, MAX_BUFFER_COUNT_KEY);
+          maxPoolSizeInfoLevelLogged = true;
+        }
+        return null;
+      }
+      if (!this.usedBufCount.compareAndSet(c, c + 1)) {
+        continue;
+      }
+      return ByteBuffer.allocateDirect(bufSize);
+    }
+  }
+
+  /**
+   * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
+   * @param buf ByteBuffer to return.
+   */
+  private void putbackBuffer(ByteBuffer buf) {
+    if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
+      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+      return;
+    }
+    buffers.offer(buf);
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index 0b97abb..e8bd322 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory;
 public class ByteBufferListOutputStream extends ByteBufferOutputStream {
   private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
 
-  private ByteBufferPool pool;
+  private ByteBuffAllocator allocator;
   // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
   // it is not available will make a new one our own and keep writing to that. We keep track of all
   // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
   // to return back all of them to pool
-  protected List<ByteBuffer> allBufs = new ArrayList<>();
-  protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
+  protected List<SingleByteBuff> allBufs = new ArrayList<>();
 
   private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
 
-  public ByteBufferListOutputStream(ByteBufferPool pool) {
-    this.pool = pool;
+  public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
+    this.allocator = allocator;
     allocateNewBuffer();
   }
 
@@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
     if (this.curBuf != null) {
       this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
     }
-    // Get an initial BB to work with from the pool
-    this.curBuf = this.pool.getBuffer();
-    if (this.curBuf == null) {
-      // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
-      // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
-      // direct heap size. See HBASE-15525 also for more details.
-      // Make BB with same size of pool's buffer size.
-      this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
-    } else {
-      this.bufsFromPool.add(this.curBuf);
-    }
-    this.allBufs.add(this.curBuf);
+    // Get an initial ByteBuffer from the allocator.
+    SingleByteBuff sbb = allocator.allocateOneBuffer();
+    this.curBuf = sbb.nioByteBuffers()[0];
+    this.allBufs.add(sbb);
   }
 
   @Override
@@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
       LOG.debug(e.toString(), e);
     }
     // Return back all the BBs to pool
-    if (this.bufsFromPool != null) {
-      for (int i = 0; i < this.bufsFromPool.size(); i++) {
-        this.pool.putbackBuffer(this.bufsFromPool.get(i));
-      }
-      this.bufsFromPool = null;
+    for (ByteBuff buf : this.allBufs) {
+      buf.release();
     }
     this.allBufs = null;
     this.curBuf = null;
@@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
       // All the other BBs are already flipped while moving to the new BB.
       curBuf.flip();
     }
-    return this.allBufs;
+    List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
+    for (SingleByteBuff bb : this.allBufs) {
+      bbs.add(bb.nioByteBuffers()[0]);
+    }
+    return bbs;
   }
 
   @Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
deleted file mode 100644
index caca20b..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
- * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
- * that it will create. When requested, if a free ByteBuffer is already present, it will return
- * that. And when no free ByteBuffer available and we are below the max count, it will create a new
- * one and return that.
- *
- * <p>
- * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
- * pass 'directByteBuffer' as false while construction of the pool.
- * <p>
- * This class is thread safe.
- *
- * @see ByteBufferListOutputStream
- */
-@InterfaceAudience.Private
-public class ByteBufferPool {
-  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
-  // TODO better config names?
-  // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
-  // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
-  public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
-  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
-  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
-                                                          // what we will write/read to/from the
-                                                          // socket channel.
-  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
-
-  private final int bufferSize;
-  private final int maxPoolSize;
-  private AtomicInteger count; // Count of the BBs created already for this pool.
-  private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
-  private boolean maxPoolSizeInfoLevelLogged = false;
-
-  /**
-   * @param bufferSize Size of each buffer created by this pool.
-   * @param maxPoolSize Max number of buffers to keep in this pool.
-   */
-  public ByteBufferPool(int bufferSize, int maxPoolSize) {
-    this(bufferSize, maxPoolSize, true);
-  }
-
-  /**
-   * @param bufferSize Size of each buffer created by this pool.
-   * @param maxPoolSize Max number of buffers to keep in this pool.
-   * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
-   */
-  public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
-    this.bufferSize = bufferSize;
-    this.maxPoolSize = maxPoolSize;
-    this.directByteBuffer = directByteBuffer;
-    // TODO can add initialPoolSize config also and make those many BBs ready for use.
-    LOG.info("Created with bufferSize={} and maxPoolSize={}",
-        org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
-        org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
-    this.count = new AtomicInteger(0);
-  }
-
-  /**
-   * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
-   *         maximum pool size, it will create a new one and return. In case of max pool size also
-   *         reached, will return null. When pool returned a ByteBuffer, make sure to return it back
-   *         to pool after use.
-   * @see #putbackBuffer(ByteBuffer)
-   */
-  public ByteBuffer getBuffer() {
-    ByteBuffer bb = buffers.poll();
-    if (bb != null) {
-      // Clear sets limit == capacity. Position == 0.
-      bb.clear();
-      return bb;
-    }
-    while (true) {
-      int c = this.count.intValue();
-      if (c >= this.maxPoolSize) {
-        if (maxPoolSizeInfoLevelLogged) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
-                + " and no free buffers now. Consider increasing the value for '"
-                + MAX_POOL_SIZE_KEY + "' ?");
-          }
-        } else {
-          LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
-              + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
-              + "' ?");
-          maxPoolSizeInfoLevelLogged = true;
-        }
-        return null;
-      }
-      if (!this.count.compareAndSet(c, c + 1)) {
-        continue;
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
-      }
-      return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
-          : ByteBuffer.allocate(this.bufferSize);
-    }
-  }
-
-  /**
-   * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
-   * obtained from this pool.
-   * @param buf ByteBuffer to return.
-   */
-  public void putbackBuffer(ByteBuffer buf) {
-    if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
-      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
-      return;
-    }
-    buffers.offer(buf);
-  }
-
-  public int getBufferSize() {
-    return this.bufferSize;
-  }
-
-  /**
-   * @return Number of free buffers
-   */
-  @VisibleForTesting
-  public int getQueueSize() {
-    return buffers.size();
-  }
-}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 8bc7974..d7ab009 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
           currentBuffer.skip(current.tagsLength);
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index 01f0a9d..ab93d19 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index baa1856..aa9a436 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 63da7e7..176bea3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
index 14d847c..9c0532e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
@@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
       decodeTags();
     }
     if (includesMvcc()) {
-      current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
     } else {
       current.memstoreTS = 0;
     }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 68cf56e..1ee3607 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
+
+
 /**
- * An abstract class that abstracts out as to how the byte buffers are used,
- * either single or multiple. We have this interface because the java's ByteBuffers
- * cannot be sub-classed. This class provides APIs similar to the ones provided
- * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
- * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
- * helps us in the read path.
+ * An abstract class that abstracts out as to how the byte buffers are used, either single or
+ * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
+ * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
+ * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
+ * have some additional APIs which helps us in the read path. <br/>
+ * The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
+ * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
+ * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
+ * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
+ * the original one will free its memory, because they share the same NIO ByteBuffers. when you want
+ * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
+ * do like this:
+ *
+ * <pre>
+ *   ByteBuff original = ...;
+ *   ByteBuff dup = original.duplicate();
+ *   dup.retain();
+ *   original.release();
+ *   // The NIO buffers can still be accessed unless you release the duplicated one
+ *   dup.get(...);
+ *   dup.release();
+ *   // Both the original and dup can not access the NIO buffers any more.
+ * </pre>
  */
 @InterfaceAudience.Private
-// TODO to have another name. This can easily get confused with netty's ByteBuf
-public abstract class ByteBuff {
+public abstract class ByteBuff implements ReferenceCounted {
+  private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
   private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
 
+  protected RefCnt refCnt;
+
+  /*************************** Methods for reference count **********************************/
+
+  protected void checkRefCount() {
+    ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
+  }
+
+  public int refCnt() {
+    return refCnt.refCnt();
+  }
+
+  @Override
+  public boolean release() {
+    return refCnt.release();
+  }
+
+  @Override
+  public final ByteBuff retain(int increment) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final boolean release(int increment) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ByteBuff touch() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final ByteBuff touch(Object hint) {
+    throw new UnsupportedOperationException();
+  }
+
+  /******************************* Methods for ByteBuff **************************************/
+
   /**
    * @return this ByteBuff's current position
    */
@@ -491,78 +550,11 @@ public abstract class ByteBuff {
     return tmpLength;
   }
 
-  /**
-   * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
-   * {@link ByteBuff}.
-   */
-  public static long readVLong(ByteBuff in) {
-    byte firstByte = in.get();
-    int len = WritableUtils.decodeVIntSize(firstByte);
-    if (len == 1) {
-      return firstByte;
-    }
-    long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
-      byte b = in.get();
-      i = i << 8;
-      i = i | (b & 0xFF);
-    }
-    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
-  }
-
-  /**
-   * Search sorted array "a" for byte "key".
-   * 
-   * @param a Array to search. Entries must be sorted and unique.
-   * @param fromIndex First index inclusive of "a" to include in the search.
-   * @param toIndex Last index exclusive of "a" to include in the search.
-   * @param key The byte to search for.
-   * @return The index of key if found. If not found, return -(index + 1), where
-   *         negative indicates "not found" and the "index + 1" handles the "-0"
-   *         case.
-   */
-  public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) {
-    int unsignedKey = key & 0xff;
-    int low = fromIndex;
-    int high = toIndex - 1;
-
-    while (low <= high) {
-      int mid = low + ((high - low) >> 1);
-      int midVal = a.get(mid) & 0xff;
-
-      if (midVal < unsignedKey) {
-        low = mid + 1;
-      } else if (midVal > unsignedKey) {
-        high = mid - 1;
-      } else {
-        return mid; // key found
-      }
-    }
-    return -(low + 1); // key not found.
-  }
+  public abstract ByteBuffer[] nioByteBuffers();
 
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
         ", cap= " + capacity() + "]";
   }
-
-  public static String toStringBinary(final ByteBuff b, int off, int len) {
-    StringBuilder result = new StringBuilder();
-    // Just in case we are passed a 'len' that is > buffer length...
-    if (off >= b.capacity())
-      return result.toString();
-    if (off + len > b.capacity())
-      len = b.capacity() - off;
-    for (int i = off; i < off + len; ++i) {
-      int ch = b.get(i) & 0xFF;
-      if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
-          || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
-        result.append((char) ch);
-      } else {
-        result.append(String.format("\\x%02X", ch));
-      }
-    }
-    return result.toString();
-  }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 97f5141..e9eadc7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
@@ -24,13 +26,12 @@ import java.nio.ByteBuffer;
 import java.nio.InvalidMarkException;
 import java.nio.channels.ReadableByteChannel;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
  * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff {
   private final int[] itemBeginPos;
 
   public MultiByteBuff(ByteBuffer... items) {
+    this(NONE, items);
+  }
+
+  public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
+    this(new RefCnt(recycler), items);
+  }
+
+  private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
+    this.refCnt = refCnt;
     assert items != null;
     assert items.length > 0;
     this.items = items;
@@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff {
     this.limitedItemIndex = this.items.length - 1;
   }
 
-  private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
-      int curItemIndex, int markedIndex) {
+  private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
+      int limitedIndex, int curItemIndex, int markedIndex) {
+    this.refCnt = refCnt;
     this.items = items;
     this.curItemIndex = curItemIndex;
     this.curItem = this.items[this.curItemIndex];
@@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int capacity() {
+    checkRefCount();
     int c = 0;
     for (ByteBuffer item : this.items) {
       c += item.capacity();
@@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte get(int index) {
+    checkRefCount();
     int itemIndex = getItemIndex(index);
     return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
   }
 
   @Override
   public byte getByteAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex = getItemIndexFromCurItemIndex(index);
@@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int getInt(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public int getIntAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public short getShort(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public short getShortAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public long getLong(int index) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int itemIndex;
     if (this.itemBeginPos[this.curItemIndex] <= index
@@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public long getLongAfterPosition(int offset) {
+    checkRefCount();
     // Mostly the index specified will land within this current item. Short circuit for that
     int index = offset + this.position();
     int itemIndex;
@@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int position() {
+    checkRefCount();
     return itemBeginPos[this.curItemIndex] + this.curItem.position();
   }
 
@@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff position(int position) {
+    checkRefCount();
     // Short circuit for positioning within the cur item. Mostly that is the case.
     if (this.itemBeginPos[this.curItemIndex] <= position
         && this.itemBeginPos[this.curItemIndex + 1] > position) {
@@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff rewind() {
+    checkRefCount();
     for (int i = 0; i < this.items.length; i++) {
       this.items[i].rewind();
     }
@@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff mark() {
+    checkRefCount();
     this.markedItemIndex = this.curItemIndex;
     this.curItem.mark();
     return this;
@@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff reset() {
+    checkRefCount();
     // when the buffer is moved to the next one.. the reset should happen on the previous marked
     // item and the new one should be taken as the base
     if (this.markedItemIndex < 0) throw new InvalidMarkException();
@@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int remaining() {
+    checkRefCount();
     int remain = 0;
     for (int i = curItemIndex; i < items.length; i++) {
       remain += items[i].remaining();
@@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public final boolean hasRemaining() {
+    checkRefCount();
     return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
         && this.items[this.curItemIndex + 1].hasRemaining());
   }
@@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte get() {
+    checkRefCount();
     if (this.curItem.remaining() == 0) {
       if (items.length - 1 == this.curItemIndex) {
         // means cur item is the last one and we wont be able to read a long. Throw exception
@@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public short getShort() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_SHORT) {
       return this.curItem.getShort();
@@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public int getInt() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_INT) {
       return this.curItem.getInt();
@@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public long getLong() {
+    checkRefCount();
     int remaining = this.curItem.remaining();
     if (remaining >= Bytes.SIZEOF_LONG) {
       return this.curItem.getLong();
@@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public void get(byte[] dst, int offset, int length) {
+    checkRefCount();
     while (length > 0) {
       int toRead = Math.min(length, this.curItem.remaining());
       ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
@@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public void get(int sourceOffset, byte[] dst, int offset, int length) {
+    checkRefCount();
     int itemIndex = getItemIndex(sourceOffset);
     ByteBuffer item = this.items[itemIndex];
     sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
@@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff limit(int limit) {
+    checkRefCount();
     this.limit = limit;
     // Normally the limit will try to limit within the last BB item
     int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
@@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff slice() {
+    checkRefCount();
     ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
     for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
       copy[j] = this.items[i].slice();
     }
-    return new MultiByteBuff(copy);
+    return new MultiByteBuff(refCnt, copy);
   }
 
   /**
-   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
-   * of the new MBB will be independent than that of the original MBB.
-   * The content of the new MBB will start at this MBB's current position
-   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
-   * values.
-   * @return a sliced MBB
+   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
+   * new MBB will be independent than that of the original MBB. The content of the new MBB will
+   * start at this MBB's current position The position, limit and mark of the new MBB would be
+   * identical to this MBB in terms of values.
+   * @return a duplicated MBB
    */
   @Override
   public MultiByteBuff duplicate() {
+    checkRefCount();
     ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
     for (int i = 0; i < this.items.length; i++) {
       itemsCopy[i] = items[i].duplicate();
     }
-    return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
-        this.curItemIndex, this.markedItemIndex);
+    return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
+        this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
   }
 
   /**
@@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(byte b) {
+    checkRefCount();
     if (this.curItem.remaining() == 0) {
       if (this.curItemIndex == this.items.length - 1) {
         throw new BufferOverflowException();
@@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(int index, byte b) {
+    checkRefCount();
     int itemIndex = getItemIndex(limit);
     ByteBuffer item = items[itemIndex];
     item.put(index - itemBeginPos[itemIndex], b);
@@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    checkRefCount();
     int destItemIndex = getItemIndex(offset);
     int srcItemIndex = getItemIndex(srcOffset);
     ByteBuffer destItem = this.items[destItemIndex];
@@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff {
   }
 
   private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
-    return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+    return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
         : ((MultiByteBuff) buf).items[index];
   }
 
@@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff putInt(int val) {
+    checkRefCount();
     if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
       this.curItem.putInt(val);
       return this;
@@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff put(byte[] src, int offset, int length) {
+    checkRefCount();
     if (this.curItem.remaining() >= length) {
       ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
       return this;
@@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff putLong(long val) {
+    checkRefCount();
     if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
       this.curItem.putLong(val);
       return this;
@@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff skip(int length) {
+    checkRefCount();
     // Get available bytes from this item and remaining from next
     int jump = 0;
     while (true) {
@@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public MultiByteBuff moveBack(int length) {
+    checkRefCount();
     while (length != 0) {
       if (length > curItem.position()) {
         length -= curItem.position();
@@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public ByteBuffer asSubByteBuffer(int length) {
+    checkRefCount();
     if (this.curItem.remaining() >= length) {
       return this.curItem;
     }
@@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff {
     ByteBuffer locCurItem = curItem;
     while (length > 0) {
       int toRead = Math.min(length, locCurItem.remaining());
-      ByteBufferUtils
-          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+      ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
+        toRead);
       length -= toRead;
       if (length == 0) break;
       locCurItemIndex++;
@@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+    checkRefCount();
     if (this.itemBeginPos[this.curItemIndex] <= offset) {
       int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
       if (this.curItem.limit() - relOffsetInCurItem >= length) {
@@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff {
   @Override
   public void get(ByteBuffer out, int sourceOffset,
       int length) {
+    checkRefCount();
       // Not used from real read path actually. So not going with
       // optimization
     for (int i = 0; i < length; ++i) {
@@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff {
    */
   @Override
   public byte[] toBytes(int offset, int length) {
+    checkRefCount();
     byte[] output = new byte[length];
     this.get(offset, output, 0, length);
     return output;
@@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff {
 
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
+    checkRefCount();
     int total = 0;
     while (true) {
       // Read max possible into the current BB
@@ -1034,13 +1082,19 @@ public class MultiByteBuff extends ByteBuff {
   }
 
   @Override
+  public ByteBuffer[] nioByteBuffers() {
+    checkRefCount();
+    return this.items;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof MultiByteBuff)) return false;
     if (this == obj) return true;
     MultiByteBuff that = (MultiByteBuff) obj;
     if (this.capacity() != that.capacity()) return false;
     if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
-        that.limit()) == 0) {
+      that.limit()) == 0) {
       return true;
     }
     return false;
@@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff {
     return hash;
   }
 
-  /**
-   * @return the ByteBuffers which this wraps.
-   */
-  @VisibleForTesting
-  public ByteBuffer[] getEnclosingByteBuffers() {
-    return this.items;
+  @Override
+  public MultiByteBuff retain() {
+    refCnt.retain();
+    return this;
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
new file mode 100644
index 0000000..80172b2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+
+/**
+ * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
+ * reference count become 0, it'll call {@link Recycler#free()} once.
+ */
+@InterfaceAudience.Private
+class RefCnt extends AbstractReferenceCounted {
+
+  private Recycler recycler = ByteBuffAllocator.NONE;
+
+  RefCnt(Recycler recycler) {
+    this.recycler = recycler;
+  }
+
+  @Override
+  protected final void deallocate() {
+    this.recycler.free();
+  }
+
+  @Override
+  public final ReferenceCounted touch(Object hint) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 6d64d7b..7205251 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,22 +17,24 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 import org.apache.yetus.audience.InterfaceAudience;
-import sun.nio.ch.DirectBuffer;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import sun.nio.ch.DirectBuffer;
 
 /**
- * An implementation of ByteBuff where a single BB backs the BBI. This just acts
- * as a wrapper over a normal BB - offheap or onheap
+ * An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a
+ * normal BB - offheap or onheap
  */
 @InterfaceAudience.Private
 public class SingleByteBuff extends ByteBuff {
@@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff {
   private Object unsafeRef = null;
 
   public SingleByteBuff(ByteBuffer buf) {
+    this(NONE, buf);
+  }
+
+  public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
+    this(new RefCnt(recycler), buf);
+  }
+
+  private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
+    this.refCnt = refCnt;
     this.buf = buf;
     if (buf.hasArray()) {
       this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset();
@@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int position() {
+    checkRefCount();
     return this.buf.position();
   }
 
   @Override
   public SingleByteBuff position(int position) {
+    checkRefCount();
     this.buf.position(position);
     return this;
   }
 
   @Override
   public SingleByteBuff skip(int len) {
+    checkRefCount();
     this.buf.position(this.buf.position() + len);
     return this;
   }
 
   @Override
   public SingleByteBuff moveBack(int len) {
+    checkRefCount();
     this.buf.position(this.buf.position() - len);
     return this;
   }
 
   @Override
   public int capacity() {
+    checkRefCount();
     return this.buf.capacity();
   }
 
   @Override
   public int limit() {
+    checkRefCount();
     return this.buf.limit();
   }
 
   @Override
   public SingleByteBuff limit(int limit) {
+    checkRefCount();
     this.buf.limit(limit);
     return this;
   }
 
   @Override
   public SingleByteBuff rewind() {
+    checkRefCount();
     this.buf.rewind();
     return this;
   }
 
   @Override
   public SingleByteBuff mark() {
+    checkRefCount();
     this.buf.mark();
     return this;
   }
 
   @Override
   public ByteBuffer asSubByteBuffer(int length) {
+    checkRefCount();
     // Just return the single BB that is available
     return this.buf;
   }
 
   @Override
   public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+    checkRefCount();
     // Just return the single BB that is available
     pair.setFirst(this.buf);
     pair.setSecond(offset);
@@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int remaining() {
+    checkRefCount();
     return this.buf.remaining();
   }
 
   @Override
   public boolean hasRemaining() {
+    checkRefCount();
     return buf.hasRemaining();
   }
 
   @Override
   public SingleByteBuff reset() {
+    checkRefCount();
     this.buf.reset();
     return this;
   }
 
   @Override
   public SingleByteBuff slice() {
-    return new SingleByteBuff(this.buf.slice());
+    checkRefCount();
+    return new SingleByteBuff(this.refCnt, this.buf.slice());
   }
 
   @Override
   public SingleByteBuff duplicate() {
-    return new SingleByteBuff(this.buf.duplicate());
+    checkRefCount();
+    return new SingleByteBuff(this.refCnt, this.buf.duplicate());
   }
 
   @Override
   public byte get() {
+    checkRefCount();
     return buf.get();
   }
 
   @Override
   public byte get(int index) {
+    checkRefCount();
     if (UNSAFE_AVAIL) {
       return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
     }
@@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public byte getByteAfterPosition(int offset) {
+    checkRefCount();
     return get(this.buf.position() + offset);
   }
 
   @Override
   public SingleByteBuff put(byte b) {
+    checkRefCount();
     this.buf.put(b);
     return this;
   }
 
   @Override
   public SingleByteBuff put(int index, byte b) {
+    checkRefCount();
     buf.put(index, b);
     return this;
   }
 
   @Override
   public void get(byte[] dst, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
     buf.position(buf.position() + length);
   }
 
   @Override
   public void get(int sourceOffset, byte[] dst, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length);
   }
 
@@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    checkRefCount();
     if (src instanceof SingleByteBuff) {
       ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
-          offset, length);
+        offset, length);
     } else {
       // TODO we can do some optimization here? Call to asSubByteBuffer might
       // create a copy.
@@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff {
       src.asSubByteBuffer(srcOffset, length, pair);
       if (pair.getFirst() != null) {
         ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
-            length);
+          length);
       }
     }
     return this;
@@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public SingleByteBuff put(byte[] src, int offset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
     return this;
   }
 
   @Override
   public SingleByteBuff put(byte[] src) {
+    checkRefCount();
     return put(src, 0, src.length);
   }
 
   @Override
   public boolean hasArray() {
+    checkRefCount();
     return this.buf.hasArray();
   }
 
   @Override
   public byte[] array() {
+    checkRefCount();
     return this.buf.array();
   }
 
   @Override
   public int arrayOffset() {
+    checkRefCount();
     return this.buf.arrayOffset();
   }
 
   @Override
   public short getShort() {
+    checkRefCount();
     return this.buf.getShort();
   }
 
   @Override
   public short getShort(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
     }
@@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public short getShortAfterPosition(int offset) {
+    checkRefCount();
     return getShort(this.buf.position() + offset);
   }
 
   @Override
   public int getInt() {
+    checkRefCount();
     return this.buf.getInt();
   }
 
   @Override
   public SingleByteBuff putInt(int value) {
+    checkRefCount();
     ByteBufferUtils.putInt(this.buf, value);
     return this;
   }
 
   @Override
   public int getInt(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
     }
@@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public int getIntAfterPosition(int offset) {
+    checkRefCount();
     return getInt(this.buf.position() + offset);
   }
 
   @Override
   public long getLong() {
+    checkRefCount();
     return this.buf.getLong();
   }
 
   @Override
   public SingleByteBuff putLong(long value) {
+    checkRefCount();
     ByteBufferUtils.putLong(this.buf, value);
     return this;
   }
 
   @Override
   public long getLong(int index) {
+    checkRefCount();
     if (UNSAFE_UNALIGNED) {
       return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
     }
@@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public long getLongAfterPosition(int offset) {
+    checkRefCount();
     return getLong(this.buf.position() + offset);
   }
 
   @Override
   public byte[] toBytes(int offset, int length) {
+    checkRefCount();
     byte[] output = new byte[length];
     ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
     return output;
@@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff {
 
   @Override
   public void get(ByteBuffer out, int sourceOffset, int length) {
+    checkRefCount();
     ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
   }
 
   @Override
   public int read(ReadableByteChannel channel) throws IOException {
+    checkRefCount();
     return channelRead(channel, buf);
   }
 
   @Override
+  public ByteBuffer[] nioByteBuffers() {
+    checkRefCount();
+    return new ByteBuffer[] { this.buf };
+  }
+
+  @Override
   public boolean equals(Object obj) {
-    if(!(obj instanceof SingleByteBuff)) return false;
-    return this.buf.equals(((SingleByteBuff)obj).buf);
+    if (!(obj instanceof SingleByteBuff)) {
+      return false;
+    }
+    return this.buf.equals(((SingleByteBuff) obj).buf);
   }
 
   @Override
@@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff {
     return this.buf.hashCode();
   }
 
-  /**
-   * @return the ByteBuffer which this wraps.
-   */
-  @VisibleForTesting
-  public ByteBuffer getEnclosingByteBuffer() {
-    return this.buf;
+  @Override
+  public SingleByteBuff retain() {
+    refCnt.retain();
+    return this;
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2e14b13..d023339 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -27,9 +27,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -311,10 +311,6 @@ public class ByteBufferArray {
       srcIndex += cnt;
     }
     assert srcIndex == len;
-    if (mbb.length > 1) {
-      return new MultiByteBuff(mbb);
-    } else {
-      return new SingleByteBuff(mbb[0]);
-    }
+    return ByteBuffAllocator.wrap(mbb);
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index deb667b..ac7bd30 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -348,25 +349,39 @@ public final class ByteBufferUtils {
     }
   }
 
-  /**
-   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
-   * {@link ByteBuffer}.
-   */
-  public static long readVLong(ByteBuffer in) {
-    byte firstByte = in.get();
+  private interface ByteVisitor {
+    byte get();
+  }
+
+  private static long readVLong(ByteVisitor visitor) {
+    byte firstByte = visitor.get();
     int len = WritableUtils.decodeVIntSize(firstByte);
     if (len == 1) {
       return firstByte;
     }
     long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
-      byte b = in.get();
+    for (int idx = 0; idx < len - 1; idx++) {
+      byte b = visitor.get();
       i = i << 8;
       i = i | (b & 0xFF);
     }
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
+  /**
+   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}.
+   */
+  public static long readVLong(ByteBuffer in) {
+    return readVLong(in::get);
+  }
+
+  /**
+   * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
+   * {@link ByteBuff}.
+   */
+  public static long readVLong(ByteBuff in) {
+    return readVLong(in::get);
+  }
 
   /**
    * Put in buffer integer using 7 bit encoding. For each written byte:
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
new file mode 100644
index 0000000..0976c11
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestByteBuffAllocator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
+
+  @Test
+  public void testAllocateByteBuffToReadInto() {
+    int maxBuffersInPool = 10;
+    int bufSize = 6 * 1024;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6);
+    ByteBuff buff = alloc.allocate(10 * bufSize);
+    buff.release();
+    // When the request size is less than 1/6th of the pool buffer size. We should use on demand
+    // created on heap Buffer
+    buff = alloc.allocate(200);
+    assertTrue(buff.hasArray());
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+    buff.release();
+    // When the request size is > 1/6th of the pool buffer size.
+    buff = alloc.allocate(1024);
+    assertFalse(buff.hasArray());
+    assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+    buff.release();// ByteBuffDeallocaor#free should put back the BB to pool.
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+    // Request size> pool buffer size
+    buff = alloc.allocate(7 * 1024);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    ByteBuffer[] bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertTrue(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(1024, bbs[1].limit());
+    assertEquals(maxBuffersInPool - 2, alloc.getQueueSize());
+    buff.release();
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+    buff = alloc.allocate(6 * 1024 + 200);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertFalse(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(200, bbs[1].limit());
+    assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+    buff.release();
+    assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+    alloc.allocate(bufSize * (maxBuffersInPool - 1));
+    buff = alloc.allocate(20 * 1024);
+    assertFalse(buff.hasArray());
+    assertTrue(buff instanceof MultiByteBuff);
+    bbs = buff.nioByteBuffers();
+    assertEquals(2, bbs.length);
+    assertTrue(bbs[0].isDirect());
+    assertFalse(bbs[1].isDirect());
+    assertEquals(6 * 1024, bbs[0].limit());
+    assertEquals(14 * 1024, bbs[1].limit());
+    assertEquals(0, alloc.getQueueSize());
+    buff.release();
+    assertEquals(1, alloc.getQueueSize());
+    alloc.allocateOneBuffer();
+
+    buff = alloc.allocate(7 * 1024);
+    assertTrue(buff.hasArray());
+    assertTrue(buff instanceof SingleByteBuff);
+    assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit());
+    buff.release();
+  }
+
+  @Test
+  public void testNegativeAllocatedSize() {
+    int maxBuffersInPool = 10;
+    ByteBuffAllocator allocator =
+        new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024);
+    try {
+      allocator.allocate(-1);
+      fail("Should throw exception when size < 0");
+    } catch (IllegalArgumentException e) {
+      // expected exception
+    }
+    ByteBuff bb = allocator.allocate(0);
+    bb.release();
+  }
+
+  @Test
+  public void testAllocateOneBuffer() {
+    // Allocate from on-heap
+    ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
+    ByteBuff buf = allocator.allocateOneBuffer();
+    assertTrue(buf.hasArray());
+    assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
+    buf.release();
+
+    // Allocate from off-heap
+    int bufSize = 10;
+    allocator = new ByteBuffAllocator(true, 1, 10, 3);
+    buf = allocator.allocateOneBuffer();
+    assertFalse(buf.hasArray());
+    assertEquals(buf.remaining(), bufSize);
+    // The another one will be allocated from on-heap because the pool has only one ByteBuffer,
+    // and still not be cleaned.
+    ByteBuff buf2 = allocator.allocateOneBuffer();
+    assertTrue(buf2.hasArray());
+    assertEquals(buf2.remaining(), bufSize);
+    // free the first one
+    buf.release();
+    // The next one will be off-heap again.
+    buf = allocator.allocateOneBuffer();
+    assertFalse(buf.hasArray());
+    assertEquals(buf.remaining(), bufSize);
+    buf.release();
+  }
+
+  @Test
+  public void testReferenceCount() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3);
+    ByteBuff buf1 = alloc.allocate(bufSize * 2);
+    assertFalse(buf1.hasArray());
+    // The next one will be allocated from heap
+    ByteBuff buf2 = alloc.allocateOneBuffer();
+    assertTrue(buf2.hasArray());
+
+    // duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer)
+    ByteBuff dup2 = buf2.duplicate();
+    dup2.release();
+    assertEquals(0, buf2.refCnt());
+    assertEquals(0, dup2.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    assertException(dup2::position);
+    assertException(buf2::position);
+
+    // duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer)
+    ByteBuff dup1 = buf1.duplicate();
+    dup1.release();
+    assertEquals(0, buf1.refCnt());
+    assertEquals(0, dup1.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+    assertException(dup1::position);
+    assertException(buf1::position);
+
+    // slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer)
+    ByteBuff buf3 = alloc.allocateOneBuffer();
+    assertFalse(buf3.hasArray());
+    ByteBuff slice3 = buf3.slice();
+    slice3.release();
+    assertEquals(0, buf3.refCnt());
+    assertEquals(0, slice3.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer)
+    ByteBuff buf4 = alloc.allocate(bufSize * 2);
+    assertFalse(buf4.hasArray());
+    ByteBuff slice4 = buf4.slice();
+    slice4.release();
+    assertEquals(0, buf4.refCnt());
+    assertEquals(0, slice4.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test multiple reference for the same ByteBuff (SingleByteBuff)
+    ByteBuff buf5 = alloc.allocateOneBuffer();
+    ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice();
+    slice5.release();
+    assertEquals(0, buf5.refCnt());
+    assertEquals(0, slice5.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+    assertException(slice5::position);
+    assertException(buf5::position);
+
+    // Test multiple reference for the same ByteBuff (SingleByteBuff)
+    ByteBuff buf6 = alloc.allocate(bufSize >> 2);
+    ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice();
+    slice6.release();
+    assertEquals(0, buf6.refCnt());
+    assertEquals(0, slice6.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain the parent SingleByteBuff (duplicate)
+    ByteBuff parent = alloc.allocateOneBuffer();
+    ByteBuff child = parent.duplicate();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain parent MultiByteBuff (duplicate)
+    parent = alloc.allocate(bufSize << 1);
+    child = parent.duplicate();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain the parent SingleByteBuff (slice)
+    parent = alloc.allocateOneBuffer();
+    child = parent.slice();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+
+    // Test retain parent MultiByteBuff (slice)
+    parent = alloc.allocate(bufSize << 1);
+    child = parent.slice();
+    child.retain();
+    parent.release();
+    assertEquals(1, child.refCnt());
+    assertEquals(1, parent.refCnt());
+    assertEquals(0, alloc.getQueueSize());
+    parent.release();
+    assertEquals(0, child.refCnt());
+    assertEquals(0, parent.refCnt());
+    assertEquals(2, alloc.getQueueSize());
+  }
+
+  @Test
+  public void testReverseRef() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+    ByteBuff buf1 = alloc.allocate(bufSize);
+    ByteBuff dup1 = buf1.duplicate();
+    assertEquals(1, buf1.refCnt());
+    assertEquals(1, dup1.refCnt());
+    buf1.release();
+    assertEquals(0, buf1.refCnt());
+    assertEquals(0, dup1.refCnt());
+    assertEquals(1, alloc.getQueueSize());
+    assertException(buf1::position);
+    assertException(dup1::position);
+  }
+
+  @Test
+  public void testByteBuffUnsupportedMethods() {
+    int bufSize = 64;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+    ByteBuff buf = alloc.allocate(bufSize);
+    assertException(() -> buf.retain(2));
+    assertException(() -> buf.release(2));
+    assertException(() -> buf.touch());
+    assertException(() -> buf.touch(new Object()));
+  }
+
+  private void assertException(Runnable r) {
+    try {
+      r.run();
+      fail();
+    } catch (Exception e) {
+      // expected exception.
+    }
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
index 2f7a869..3ac7a75 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream {
 
   @Test
   public void testWrites() throws Exception {
-    ByteBufferPool pool = new ByteBufferPool(10, 3);
-    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6);
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc);
     bbos.write(2);// Write a byte
     bbos.writeInt(100);// Write an int
     byte[] b = Bytes.toBytes("row123");// 6 bytes
     bbos.write(b);
+    assertEquals(2, bbos.allBufs.size());
     // Just use the 3rd BB from pool so that pabos, on request, wont get one
-    ByteBuffer bb1 = pool.getBuffer();
+    ByteBuff bb1 = alloc.allocateOneBuffer();
     ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
     bbos.write(bb, 0, bb.capacity());
-    pool.putbackBuffer(bb1);
+    bb1.release();
     bbos.writeInt(123);
     bbos.writeInt(124);
-    assertEquals(0, pool.getQueueSize());
+    assertEquals(0, alloc.getQueueSize());
     List<ByteBuffer> allBufs = bbos.getByteBuffers();
     assertEquals(4, allBufs.size());
-    assertEquals(3, bbos.bufsFromPool.size());
+    assertEquals(4, bbos.allBufs.size());
     ByteBuffer b1 = allBufs.get(0);
     assertEquals(10, b1.remaining());
     assertEquals(2, b1.get());
     assertEquals(100, b1.getInt());
     byte[] bActual = new byte[b.length];
-    b1.get(bActual, 0, 5);//5 bytes in 1st BB
+    b1.get(bActual, 0, 5);// 5 bytes in 1st BB
     ByteBuffer b2 = allBufs.get(1);
     assertEquals(10, b2.remaining());
     b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
@@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream {
     assertEquals(4, b4.remaining());
     assertEquals(124, b4.getInt());
     bbos.releaseResources();
-    assertEquals(3, pool.getQueueSize());
+    assertEquals(3, alloc.getQueueSize());
   }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
deleted file mode 100644
index 44d2f45..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ IOTests.class, SmallTests.class })
-public class TestByteBufferPool {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestByteBufferPool.class);
-
-  @Test
-  public void testOffheapBBPool() throws Exception {
-    boolean directByteBuffer = true;
-    testBBPool(10, 100, directByteBuffer);
-  }
-
-  @Test
-  public void testOnheapBBPool() throws Exception {
-    boolean directByteBuffer = false;
-    testBBPool(10, 100, directByteBuffer);
-  }
-
-  private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
-    ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
-    for (int i = 0; i < maxPoolSize; i++) {
-      ByteBuffer buffer = pool.getBuffer();
-      assertEquals(0, buffer.position());
-      assertEquals(bufferSize, buffer.limit());
-      assertEquals(directByteBuffer, buffer.isDirect());
-    }
-    assertEquals(0, pool.getQueueSize());
-    ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
-        : ByteBuffer.allocateDirect(bufferSize);
-    pool.putbackBuffer(bb);
-    assertEquals(0, pool.getQueueSize());
-    bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
-        : ByteBuffer.allocate(bufferSize + 1);
-    pool.putbackBuffer(bb);
-    assertEquals(0, pool.getQueueSize());
-  }
-}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
index 84cf7a4..fcfb77a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
@@ -286,12 +286,12 @@ public class TestMultiByteBuff {
     multi.putInt(45);
     multi.position(1);
     multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
-    MultiByteBuff sliced = multi.slice();
+    ByteBuff sliced = multi.slice();
     assertEquals(0, sliced.position());
     assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
     assertEquals(l1, sliced.getLong());
     assertEquals(l2, sliced.getLong());
-    MultiByteBuff dup = multi.duplicate();
+    ByteBuff dup = multi.duplicate();
     assertEquals(1, dup.position());
     assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
     assertEquals(l1, dup.getLong());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
index 0224dea..a842967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
@@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize {
 
   /**
    * SHARED means when this Cacheable is read back from cache it refers to the same memory area as
-   * used by the cache for caching it.
-   * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
-   * exclusive memory area of this Cacheable.
+   * used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
+   * the data was copied to an exclusive memory area of this Cacheable.
    */
-  public static enum MemoryType {
+  enum MemoryType {
     SHARED, EXCLUSIVE
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 80b1288..5ed3d2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
     NettyServerCall reqTooBig =
       new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
         connection, 0, connection.addr, System.currentTimeMillis(), 0,
-        connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null);
+        connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
 
     connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 742a728..bba1bed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer {
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
       long startTime, int timeout) throws IOException {
     NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
-        -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
+        -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
     return call(fakeCall, status);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index 2fae311..8dc08c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
 
   NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
       Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
-      InetAddress remoteAddress, long receiveTime, int timeout,
-      ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
-    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
-        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+      InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+        timeout, bbAllocator, cellBlockBuilder, reqCleanup);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
index ffa16bf..2f97f53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
 
   void process(final ByteBuf buf) throws IOException, InterruptedException {
     if (connectionHeaderRead) {
-      this.callCleanup = new RpcServer.CallCleanup() {
-        @Override
-        public void run() {
-          buf.release();
-        }
-      };
+      this.callCleanup = buf::release;
       process(new SingleByteBuff(buf.nioBuffer()));
     } else {
       ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
@@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
       long size, final InetAddress remoteAddress, int timeout,
       CallCleanup reqCleanup) {
     return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
-        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
         this.rpcServer.cellBlockBuilder, reqCleanup);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3ab63dd..ac8c26c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface,
 
   protected UserProvider userProvider;
 
-  protected final ByteBufferPool reservoir;
-  // The requests and response will use buffers from ByteBufferPool, when the size of the
-  // request/response is at least this size.
-  // We make this to be 1/6th of the pool buffer size.
-  protected final int minSizeForReservoirUse;
+  protected final ByteBuffAllocator bbAllocator;
 
   protected volatile boolean allowFallbackToSimpleAuth;
 
@@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface,
   private RSRpcServices rsRpcServices;
 
   @FunctionalInterface
-  protected static interface CallCleanup {
+  protected interface CallCleanup {
     void run();
   }
 
@@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface,
       final List<BlockingServiceAndInterface> services,
       final InetSocketAddress bindAddress, Configuration conf,
       RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
-    if (reservoirEnabled) {
-      int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
-          ByteBufferPool.DEFAULT_BUFFER_SIZE);
-      // The max number of buffers to be pooled in the ByteBufferPool. The default value been
-      // selected based on the #handlers configured. When it is read request, 2 MB is the max size
-      // at which we will send back one RPC request. Means max we need 2 MB for creating the
-      // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
-      // include the heap size overhead of each cells also.) Considering 2 MB, we will need
-      // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
-      // is by default 64 KB.
-      // In case of read request, at the end of the handler process, we will make the response
-      // cellblock and add the Call to connection's response Q and a single Responder thread takes
-      // connections and responses from that one by one and do the socket write. So there is chances
-      // that by the time a handler originated response is actually done writing to socket and so
-      // released the BBs it used, the handler might have processed one more read req. On an avg 2x
-      // we consider and consider that also for the max buffers to pool
-      int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
-      int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
-          conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-              HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
-      this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
-      this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
-    } else {
-      reservoir = null;
-      this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
-    }
+    this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
     this.server = server;
     this.services = services;
     this.bindAddress = bindAddress;
@@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface,
     this.scheduler = scheduler;
   }
 
-  @VisibleForTesting
-  static int getMinSizeForReservoirUse(ByteBufferPool pool) {
-    return pool.getBufferSize() / 6;
-  }
-
   @Override
   public void onConfigurationChange(Configuration newConf) {
     initReconfigurable(newConf);
@@ -652,55 +613,6 @@ public abstract class RpcServer implements RpcServerInterface,
   }
 
   /**
-   * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
-   * as much as possible.
-   *
-   * @param pool The ByteBufferPool to use
-   * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
-   *           need of size below this, create on heap ByteBuffer.
-   * @param reqLen Bytes count in request
-   */
-  @VisibleForTesting
-  static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
-      int minSizeForPoolUse, int reqLen) {
-    ByteBuff resultBuf;
-    List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
-    int remain = reqLen;
-    ByteBuffer buf = null;
-    while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
-      bbs.add(buf);
-      remain -= pool.getBufferSize();
-    }
-    ByteBuffer[] bufsFromPool = null;
-    if (bbs.size() > 0) {
-      bufsFromPool = new ByteBuffer[bbs.size()];
-      bbs.toArray(bufsFromPool);
-    }
-    if (remain > 0) {
-      bbs.add(ByteBuffer.allocate(remain));
-    }
-    if (bbs.size() > 1) {
-      ByteBuffer[] items = new ByteBuffer[bbs.size()];
-      bbs.toArray(items);
-      resultBuf = new MultiByteBuff(items);
-    } else {
-      // We are backed by single BB
-      resultBuf = new SingleByteBuff(bbs.get(0));
-    }
-    resultBuf.limit(reqLen);
-    if (bufsFromPool != null) {
-      final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
-      return new Pair<>(resultBuf, () -> {
-        // Return back all the BBs to pool
-        for (int i = 0; i < bufsFromPoolFinal.length; i++) {
-          pool.putbackBuffer(bufsFromPoolFinal[i]);
-        }
-      });
-    }
-    return new Pair<>(resultBuf, null);
-  }
-
-  /**
    * Needed for features such as delayed calls.  We need to be able to store the current call
    * so that we can complete it later or ask questions of what is supported by the current ongoing
    * call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index cf1cf9a..f93f3a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -26,10 +26,10 @@ import java.util.Optional;
 
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
   protected long startTime;
   protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
 
-  protected final ByteBufferPool reservoir;
+  protected final ByteBuffAllocator bbAllocator;
 
   protected final CellBlockBuilder cellBlockBuilder;
 
@@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
   private long exceptionSize = 0;
   private final boolean retryImmediatelySupported;
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-      justification="Can't figure why this complaint is happening... see below")
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+      justification = "Can't figure why this complaint is happening... see below")
   ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
-      Message param, CellScanner cellScanner, T connection, long size,
-      InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+      Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
+      long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
       CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
     this.id = id;
     this.service = service;
@@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
     this.remoteAddress = remoteAddress;
     this.timeout = timeout;
     this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
-    this.reservoir = reservoir;
+    this.bbAllocator = byteBuffAllocator;
     this.cellBlockBuilder = cellBlockBuilder;
     this.reqCleanup = reqCleanup;
   }
@@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
       // high when we can avoid a big buffer allocation on each rpc.
       List<ByteBuffer> cellBlock = null;
       int cellBlockSize = 0;
-      if (this.reservoir != null) {
+      if (bbAllocator.isReservoirEnabled()) {
         this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
-          this.connection.compressionCodec, cells, this.reservoir);
+          this.connection.compressionCodec, cells, bbAllocator);
         if (this.cellBlockStream != null) {
           cellBlock = this.cellBlockStream.getByteBuffers();
           cellBlockSize = this.cellBlockStream.size();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index 2a8cfbe..f3f7807 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer {
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
       long startTime, int timeout) throws IOException {
     SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
-        null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
+        null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
     return call(fakeCall, status);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index 6084138..311b4c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
       justification = "Can't figure why this complaint is happening... see below")
   SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
       RequestHeader header, Message param, CellScanner cellScanner,
-      SimpleServerRpcConnection connection, long size,
-      final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
-      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
-    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
-        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+      SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
+      long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
+      SimpleRpcServerResponder responder) {
+    super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+        timeout, bbAllocator, cellBlockBuilder, reqCleanup);
     this.responder = responder;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index b4b5f33..01127cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Pair;
 
 /** Reads calls from a connection and queues them for handling. */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
@@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
           // Notify the client about the offending request
           SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
               null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
-              this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
+              this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
           this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
           // Make sure the client recognizes the underlying exception
           // Otherwise, throw a DoNotRetryIOException.
@@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
 
   // It creates the ByteBuff and CallCleanup and assign to Connection instance.
   private void initByteBuffToReadInto(int length) {
-    // We create random on heap buffers are read into those when
-    // 1. ByteBufferPool is not there.
-    // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
-    // waste then. Also if all the reqs are of this size, we will be creating larger sized
-    // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
-    // RegionOpen.
-    // 3. If it is an initial handshake signal or initial connection request. Any way then
-    // condition 2 itself will match
-    // 4. When SASL use is ON.
-    if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
-        useSasl || length < this.rpcServer.minSizeForReservoirUse) {
-      this.data = new SingleByteBuff(ByteBuffer.allocate(length));
-    } else {
-      Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
-        this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
-      this.data = pair.getFirst();
-      this.callCleanup = pair.getSecond();
-    }
+    this.data = rpcServer.bbAllocator.allocate(length);
+    this.callCleanup = data::release;
   }
 
   protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
@@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
       RequestHeader header, Message param, CellScanner cellScanner, long size,
       InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
     return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
-        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
         this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 8a2dfcc..fe57054 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
 import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
 import static org.junit.Assert.assertEquals;
 
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -90,7 +90,7 @@ public class TestAsyncTableGetMultiThreaded {
   protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
     TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
     TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
-    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+    TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
     TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
       String.valueOf(memoryCompaction));
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
index 267e9e8..abf20dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.NettyRpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
@@ -71,9 +72,8 @@ public class TestServerLoadDurability {
 
   private static Configuration createConfigurationForSimpleRpcServer() {
     Configuration conf = HBaseConfiguration.create();
-    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
-        SimpleRpcServer.class.getName());
-    conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20);
+    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
+    conf.setInt(BUFFER_SIZE_KEY, 20);
     return conf;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 48080b2..32160a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -832,7 +832,7 @@ public class TestHFileBlock {
     if (ClassSize.is32BitJVM()) {
       assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
     } else {
-      assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
+      assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
     }
 
     for (int size : new int[] { 100, 256, 12345 }) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
deleted file mode 100644
index 560190b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RPCTests.class, SmallTests.class })
-public class TestRpcServer {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRpcServer.class);
-
-  @Test
-  public void testAllocateByteBuffToReadInto() throws Exception {
-    int maxBuffersInPool = 10;
-    ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
-    initPoolWithAllBuffers(pool, maxBuffersInPool);
-    ByteBuff buff = null;
-    Pair<ByteBuff, CallCleanup> pair;
-    // When the request size is less than 1/6th of the pool buffer size. We should use on demand
-    // created on heap Buffer
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        200);
-    buff = pair.getFirst();
-    assertTrue(buff.hasArray());
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-    assertNull(pair.getSecond());
-    // When the request size is > 1/6th of the pool buffer size.
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-    // Request size> pool buffer size
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        7 * 1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertTrue(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(1024, bbs[1].limit());
-    assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        6 * 1024 + 200);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertFalse(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(200, bbs[1].limit());
-    assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(maxBuffersInPool, pool.getQueueSize());
-
-    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
-    for (int i = 0; i < maxBuffersInPool - 1; i++) {
-      buffers[i] = pool.getBuffer();
-    }
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        20 * 1024);
-    buff = pair.getFirst();
-    assertFalse(buff.hasArray());
-    assertTrue(buff instanceof MultiByteBuff);
-    bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
-    assertEquals(2, bbs.length);
-    assertTrue(bbs[0].isDirect());
-    assertFalse(bbs[1].isDirect());
-    assertEquals(6 * 1024, bbs[0].limit());
-    assertEquals(14 * 1024, bbs[1].limit());
-    assertEquals(0, pool.getQueueSize());
-    assertNotNull(pair.getSecond());
-    pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
-    assertEquals(1, pool.getQueueSize());
-    pool.getBuffer();
-    pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
-        7 * 1024);
-    buff = pair.getFirst();
-    assertTrue(buff.hasArray());
-    assertTrue(buff instanceof SingleByteBuff);
-    assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
-    assertNull(pair.getSecond());
-  }
-
-  private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
-    ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
-    // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
-    // all. Makes pool with max #buffers.
-    for (int i = 0; i < maxBuffersInPool; i++) {
-      buffers[i] = pool.getBuffer();
-    }
-    for (ByteBuffer buf : buffers) {
-      pool.putbackBuffer(buf);
-    }
-  }
-}