You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2020/10/15 15:13:23 UTC

[cassandra] branch trunk updated: CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks

This is an automated email from the ASF dual-hosted git repository.

jasonstack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 699a1f7  CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks
699a1f7 is described below

commit 699a1f74fcc1da1952da6b2b0309c9e2474c67f4
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Thu Oct 15 22:53:44 2020 +0800

    CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks
    
    * initiate multiple buffer pool for different lifespan and usages
      - Chunk Cache Buffer Pool - conf.file_cache_size_in_mb=512mb
      - Networking Buffer Pool - conf.temporary_cache_size_in_mb=128mb
    
    * Add overflowSize and usedSize to buffer pool metrics
    
    * re-circulate buffer pool Chunk for ChunkCache whenever it has free space, even thoughput it may not be able to allocate due to fragmentation
    
    patch by Zhao Yang; reviewed by Caleb Rackliffe and Aleksey Yeschenko for CASSANDRA-15229
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  13 +-
 .../org/apache/cassandra/cache/ChunkCache.java     |  14 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  14 +
 .../db/streaming/CassandraStreamWriter.java        |   6 +-
 .../cassandra/hints/ChecksummedDataInput.java      |   6 +-
 .../hints/CompressedChecksummedDataInput.java      |  13 +-
 .../io/util/BufferManagingRebufferer.java          |   6 +-
 .../cassandra/metrics/BufferPoolMetrics.java       |  45 +-
 .../cassandra/net/AsyncStreamingOutputPlus.java    |  13 +-
 .../apache/cassandra/net/BufferPoolAllocator.java  |  13 +-
 .../cassandra/net/FrameDecoderLegacyLZ4.java       |  11 +-
 .../org/apache/cassandra/net/FrameEncoder.java     |   9 +-
 .../org/apache/cassandra/net/FrameEncoderCrc.java  |   2 +-
 .../org/apache/cassandra/net/FrameEncoderLZ4.java  |   9 +-
 .../cassandra/net/FrameEncoderLegacyLZ4.java       |   8 +-
 .../cassandra/net/FrameEncoderUnprotected.java     |   2 +-
 .../apache/cassandra/net/HandshakeProtocol.java    |   6 +-
 .../cassandra/net/InboundConnectionInitiator.java  |   6 +-
 .../cassandra/net/LocalBufferPoolAllocator.java    |   3 +-
 .../cassandra/net/OutboundConnectionInitiator.java |   4 +-
 .../org/apache/cassandra/net/ShareableBytes.java   |   6 +-
 .../apache/cassandra/utils/memory/BufferPool.java  | 466 ++++++++++++++++-----
 .../apache/cassandra/utils/memory/BufferPools.java |  79 ++++
 .../apache/cassandra/net/ConnectionBurnTest.java   |   4 +-
 .../cassandra/utils/memory/LongBufferPoolTest.java | 111 ++---
 test/data/jmxdump/cassandra-4.0-jmx.yaml           |  75 +++-
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../cassandra/metrics/BufferPoolMetricsTest.java   | 125 ++++--
 .../unit/org/apache/cassandra/net/FramingTest.java |   6 +-
 .../cassandra/utils/memory/BufferPoolTest.java     | 361 +++++++++++-----
 32 files changed, 1067 insertions(+), 376 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fe3fef8..543a1cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229)
  * Fail truncation requests when they fail on a replica (CASSANDRA-16208)
  * Move compact storage validation earlier in startup process (CASSANDRA-16063)
  * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a virtual table (CASSANDRA-16155)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ff414ed..37b18f9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -469,13 +469,22 @@ concurrent_counter_writes: 32
 # be limited by the less of concurrent reads or concurrent writes.
 concurrent_materialized_view_writes: 32
 
+# Maximum memory to use for inter-node and client-server networking buffers.
+#
+# Defaults to the smaller of 1/16 of heap or 128MB. This pool is allocated off-heap,
+# so is in addition to the memory allocated for heap. The cache also has on-heap
+# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
+# if the default 64k chunk size is used).
+# Memory is only allocated when needed.
+# networking_cache_size_in_mb: 128
+
 # Enable the sstable chunk cache.  The chunk cache will store recently accessed
 # sections of the sstable in-memory as uncompressed buffers.
 # file_cache_enabled: false
 
 # Maximum memory to use for sstable chunk cache and buffer pooling.
-# 32MB of this are reserved for pooling buffers, the rest is used as an
-# cache that holds uncompressed sstable chunks.
+# 32MB of this are reserved for pooling buffers, the rest is used for chunk cache
+# that holds uncompressed sstable chunks.
 # Defaults to the smaller of 1/4 of heap or 512MB. This pool is allocated off-heap,
 # so is in addition to the memory allocated for heap. The cache also has on-heap
 # overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index ae38015..c53810a 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.ChunkCacheMetrics;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 public class ChunkCache
         implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize
@@ -43,7 +44,9 @@ public class ChunkCache
     public static final boolean roundUp = DatabaseDescriptor.getFileCacheRoundUp();
 
     private static boolean enabled = DatabaseDescriptor.getFileCacheEnabled() && cacheSize > 0;
-    public static final ChunkCache instance = enabled ? new ChunkCache() : null;
+    public static final ChunkCache instance = enabled ? new ChunkCache(BufferPools.forChunkCache()) : null;
+
+    private final BufferPool bufferPool;
 
     private final LoadingCache<Key, Buffer> cache;
     public final ChunkCacheMetrics metrics;
@@ -86,7 +89,7 @@ public class ChunkCache
         }
     }
 
-    static class Buffer implements Rebufferer.BufferHolder
+    class Buffer implements Rebufferer.BufferHolder
     {
         private final ByteBuffer buffer;
         private final long offset;
@@ -130,12 +133,13 @@ public class ChunkCache
         public void release()
         {
             if (references.decrementAndGet() == 0)
-                BufferPool.put(buffer);
+                bufferPool.put(buffer);
         }
     }
 
-    private ChunkCache()
+    private ChunkCache(BufferPool pool)
     {
+        bufferPool = pool;
         metrics = new ChunkCacheMetrics(this);
         cache = Caffeine.newBuilder()
                         .maximumWeight(cacheSize)
@@ -149,7 +153,7 @@ public class ChunkCache
     @Override
     public Buffer load(Key key)
     {
-        ByteBuffer buffer = BufferPool.get(key.file.chunkSize(), key.file.preferredBufferType());
+        ByteBuffer buffer = bufferPool.get(key.file.chunkSize(), key.file.preferredBufferType());
         assert buffer != null;
         key.file.readChunk(key.position, buffer);
         return new Buffer(buffer, key.position);
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index da410155..26854da 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -302,6 +302,8 @@ public class Config
     private static boolean isClientMode = false;
     private static Supplier<Config> overrideLoadConfig = null;
 
+    public Integer networking_cache_size_in_mb;
+
     public Integer file_cache_size_in_mb;
 
     public boolean file_cache_enabled = Boolean.getBoolean("cassandra.file_cache_enabled");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e8e66fa..0387105 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -453,6 +453,9 @@ public class DatabaseDescriptor
         if (conf.concurrent_replicates != null)
             logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml");
 
+        if (conf.networking_cache_size_in_mb == null)
+            conf.networking_cache_size_in_mb = Math.min(128, (int) (Runtime.getRuntime().maxMemory() / (16 * 1048576)));
+
         if (conf.file_cache_size_in_mb == null)
             conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
 
@@ -2449,6 +2452,17 @@ public class DatabaseDescriptor
         return conf.file_cache_size_in_mb;
     }
 
+    public static int getNetworkingCacheSizeInMB()
+    {
+        if (conf.networking_cache_size_in_mb == null)
+        {
+            // In client mode the value is not set.
+            assert DatabaseDescriptor.isClientInitialized();
+            return 0;
+        }
+        return conf.networking_cache_size_in_mb;
+    }
+
     public static boolean getFileCacheRoundUp()
     {
         if (conf.file_cache_round_up == null)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index 10296fb..6481f4b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static org.apache.cassandra.net.MessagingService.current_version;
 
@@ -152,7 +152,7 @@ public class CassandraStreamWriter
 
         // this buffer will hold the data from disk. as it will be compressed on the fly by
         // AsyncChannelCompressedStreamWriter.write(ByteBuffer), we can release this buffer as soon as we can.
-        ByteBuffer buffer = BufferPool.get(minReadable, BufferType.OFF_HEAP);
+        ByteBuffer buffer = BufferPools.forNetworking().get(minReadable, BufferType.OFF_HEAP);
         try
         {
             int readCount = proxy.read(buffer, start);
@@ -171,7 +171,7 @@ public class CassandraStreamWriter
         }
         finally
         {
-            BufferPool.put(buffer);
+            BufferPools.forNetworking().put(buffer);
         }
 
         return toTransfer;
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 30d18fa..e6e8b38 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,9 +26,9 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.NativeLibrary;
-import org.apache.cassandra.utils.memory.BufferPool;
 
 /**
  * A {@link RandomAccessReader} wrapper that calculates the CRC in place.
@@ -55,7 +55,7 @@ public class ChecksummedDataInput extends RebufferingInputStream
 
     ChecksummedDataInput(ChannelProxy channel, BufferType bufferType)
     {
-        super(BufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, bufferType));
+        super(bufferType.allocate(RandomAccessReader.DEFAULT_BUFFER_SIZE));
 
         crc = new CRC32();
         crcPosition = 0;
@@ -244,7 +244,7 @@ public class ChecksummedDataInput extends RebufferingInputStream
     @Override
     public void close()
     {
-        BufferPool.put(buffer);
+        FileUtils.clean(buffer);
         channel.close();
     }
 
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index 0381b00..2f442be 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -28,9 +28,12 @@ import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.utils.memory.BufferPool;
 import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 public final class CompressedChecksummedDataInput extends ChecksummedDataInput
 {
+    private static final BufferPool bufferPool = BufferPools.forChunkCache();
+
     private final ICompressor compressor;
     private volatile long filePosition = 0;     // Current position in file, advanced when reading chunk.
     private volatile long sourcePosition = 0;   // Current position in file to report, advanced after consuming chunk.
@@ -117,9 +120,9 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
             int bufferSize = compressedSize + (compressedSize / 20);  // allocate +5% to cover variability in compressed size
             if (compressedBuffer != null)
             {
-                BufferPool.put(compressedBuffer);
+                bufferPool.put(compressedBuffer);
             }
-            compressedBuffer = BufferPool.get(bufferSize, compressor.preferredBufferType());
+            compressedBuffer = bufferPool.get(bufferSize, compressor.preferredBufferType());
         }
 
         compressedBuffer.clear();
@@ -131,8 +134,8 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
         if (buffer.capacity() < uncompressedSize)
         {
             int bufferSize = uncompressedSize + (uncompressedSize / 20);
-            BufferPool.put(buffer);
-            buffer = BufferPool.get(bufferSize, compressor.preferredBufferType());
+            bufferPool.put(buffer);
+            buffer = bufferPool.get(bufferSize, compressor.preferredBufferType());
         }
 
         buffer.clear();
@@ -151,7 +154,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
     @Override
     public void close()
     {
-        BufferPool.put(compressedBuffer);
+        bufferPool.put(compressedBuffer);
         super.close();
     }
 
diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index f3b9a88..3a297ee 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -23,7 +23,7 @@ package org.apache.cassandra.io.util;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 /**
  * Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are
@@ -42,14 +42,14 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
     protected BufferManagingRebufferer(ChunkReader wrapped)
     {
         this.source = wrapped;
-        buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
+        buffer = BufferPools.forChunkCache().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
         buffer.limit(0);
     }
 
     @Override
     public void closeReader()
     {
-        BufferPool.put(buffer);
+        BufferPools.forChunkCache().put(buffer);
         offset = -1;
     }
 
diff --git a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
index c9c859a..78e7265 100644
--- a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,24 +25,47 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 public class BufferPoolMetrics
 {
-    private static final MetricNameFactory factory = new DefaultNameFactory("BufferPool");
+    /** Total number of hits */
+    public final Meter hits;
 
     /** Total number of misses */
     public final Meter misses;
 
-    /** Total size of buffer pools, in bytes */
+    /** Total size of buffer pools, in bytes, including overflow allocation */
     public final Gauge<Long> size;
 
-    public BufferPoolMetrics()
+    /** Total size, in bytes, of active buffered being used from the pool currently + overflow */
+    public final Gauge<Long> usedSize;
+
+    /**
+     * Total size, in bytes, of direct or heap buffers allocated by the pool but not part of the pool
+     * either because they are too large to fit or because the pool has exceeded its maximum limit or because it's
+     * on-heap allocation.
+     */
+    public final Gauge<Long> overflowSize;
+
+    public BufferPoolMetrics(String scope, BufferPool bufferPool)
     {
+        MetricNameFactory factory = new DefaultNameFactory("BufferPool", scope);
+
+        hits = Metrics.meter(factory.createMetricName("Hits"));
+
         misses = Metrics.meter(factory.createMetricName("Misses"));
 
-        size = Metrics.register(factory.createMetricName("Size"), new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return BufferPool.sizeInBytes();
-            }
-        });
+        overflowSize = Metrics.register(factory.createMetricName("OverflowSize"), bufferPool::overflowMemoryInBytes);
+
+        usedSize = Metrics.register(factory.createMetricName("UsedSize"), bufferPool::usedSizeInBytes);
+
+        size = Metrics.register(factory.createMetricName("Size"), bufferPool::sizeInBytes);
+    }
+
+    /**
+     * used to register alias for 3.0/3.11 compatibility
+     */
+    public void register3xAlias()
+    {
+        MetricNameFactory legacyFactory = new DefaultNameFactory("BufferPool");
+        Metrics.registerMBean(misses, legacyFactory.createMetricName("Misses").getMBeanName());
+        Metrics.registerMBean(size, legacyFactory.createMetricName("Size").getMBeanName());
     }
 }
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index 680a9d3..3a9c075 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel;
 import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.lang.Math.min;
 
@@ -53,6 +54,8 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
 {
     private static final Logger logger = LoggerFactory.getLogger(AsyncStreamingOutputPlus.class);
 
+    private final BufferPool bufferPool = BufferPools.forNetworking();
+
     final int defaultLowWaterMark;
     final int defaultHighWaterMark;
 
@@ -68,7 +71,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
     private void allocateBuffer()
     {
         // this buffer is only used for small quantities of data
-        buffer = BufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP);
+        buffer = bufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP);
     }
 
     @Override
@@ -140,7 +143,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
                     throw new IllegalStateException("Can only allocate one ByteBuffer");
                 limiter.acquire(size);
                 holder.promise = beginFlush(size, defaultLowWaterMark, defaultHighWaterMark);
-                holder.buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+                holder.buffer = bufferPool.get(size, BufferType.OFF_HEAP);
                 return holder.buffer;
             });
         }
@@ -148,14 +151,14 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
         {
             // we don't currently support cancelling the flush, but at this point we are recoverable if we want
             if (holder.buffer != null)
-                BufferPool.put(holder.buffer);
+                bufferPool.put(holder.buffer);
             if (holder.promise != null)
                 holder.promise.tryFailure(t);
             throw t;
         }
 
         ByteBuffer buffer = holder.buffer;
-        BufferPool.putUnusedPortion(buffer);
+        bufferPool.putUnusedPortion(buffer);
 
         int length = buffer.limit();
         channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(buffer), holder.promise);
@@ -260,7 +263,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
     {
         if (buffer != null)
         {
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
             buffer = null;
         }
     }
diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
index 224f690..11c0641 100644
--- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
@@ -25,6 +25,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 /**
  * A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour
@@ -32,6 +33,8 @@ import org.apache.cassandra.utils.memory.BufferPool;
  */
 public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
 {
+    private static final BufferPool bufferPool = BufferPools.forNetworking();
+
     BufferPoolAllocator()
     {
         super(true);
@@ -60,22 +63,22 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
 
     ByteBuffer get(int size)
     {
-        return BufferPool.get(size, BufferType.OFF_HEAP);
+        return bufferPool.get(size, BufferType.OFF_HEAP);
     }
 
     ByteBuffer getAtLeast(int size)
     {
-        return BufferPool.getAtLeast(size, BufferType.OFF_HEAP);
+        return bufferPool.getAtLeast(size, BufferType.OFF_HEAP);
     }
 
     void put(ByteBuffer buffer)
     {
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     void putUnusedPortion(ByteBuffer buffer)
     {
-        BufferPool.putUnusedPortion(buffer);
+        bufferPool.putUnusedPortion(buffer);
     }
 
     void release()
@@ -100,7 +103,7 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
         public void deallocate()
         {
             if (wrapped != null)
-                BufferPool.put(wrapped);
+                bufferPool.put(wrapped);
         }
 
         public ByteBuffer adopt()
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
index bf6bc17..4c620c7 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
@@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
 import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.lang.Integer.reverseBytes;
 import static java.lang.String.format;
@@ -46,6 +47,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.copyBytes;
  */
 class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
 {
+    private static final BufferPool bufferPool = BufferPools.forNetworking();
+
     FrameDecoderLegacyLZ4(BufferPoolAllocator allocator, int messagingVersion)
     {
         super(allocator, messagingVersion);
@@ -122,7 +125,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
             assert msg instanceof BufferPoolAllocator.Wrapped;
             ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt();
             // netty will probably have mis-predicted the space needed
-            BufferPool.putUnusedPortion(buf);
+            bufferPool.putUnusedPortion(buf);
 
             CorruptLZ4Frame error = null;
             try
@@ -252,7 +255,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
             }
             catch (Throwable t)
             {
-                BufferPool.put(out);
+                bufferPool.put(out);
                 throw t;
             }
         }
@@ -269,7 +272,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
         {
             if (null != stash)
             {
-                BufferPool.put(stash);
+                bufferPool.put(stash);
                 stash = null;
             }
 
@@ -348,7 +351,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
             ByteBuffer out = allocator.getAtLeast(capacity);
             in.flip();
             out.put(in);
-            BufferPool.put(in);
+            bufferPool.put(in);
             return out;
         }
 
diff --git a/src/java/org/apache/cassandra/net/FrameEncoder.java b/src/java/org/apache/cassandra/net/FrameEncoder.java
index d9df166..5f2dc37 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoder.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoder.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
 {
+    protected static final BufferPool bufferPool = BufferPools.forNetworking();
+
     /**
      * An abstraction useful for transparently allocating buffers that can be written to upstream
      * of the {@code FrameEncoder} without knowledge of the encoder's frame layout, while ensuring
@@ -57,7 +60,7 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
             this.headerLength = headerLength;
             this.trailerLength = trailerLength;
 
-            buffer = BufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP);
+            buffer = bufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP);
             assert buffer.capacity() >= payloadCapacity + headerLength + trailerLength;
             buffer.position(headerLength);
             buffer.limit(buffer.capacity() - trailerLength);
@@ -103,12 +106,12 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
             isFinished = true;
             buffer.limit(buffer.position() + trailerLength);
             buffer.position(0);
-            BufferPool.putUnusedPortion(buffer);
+            bufferPool.putUnusedPortion(buffer);
         }
 
         void release()
         {
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
index 2d07d6d..5049f29 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
@@ -91,7 +91,7 @@ class FrameEncoderCrc extends FrameEncoder
         }
         catch (Throwable t)
         {
-            BufferPool.put(frame);
+            bufferPool.put(frame);
             throw t;
         }
     }
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
index 12351ce..2d76170 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
@@ -27,7 +27,6 @@ import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.BufferPool;
 
 import static org.apache.cassandra.net.Crc.*;
 
@@ -74,7 +73,7 @@ class FrameEncoderLZ4 extends FrameEncoder
                 throw new IllegalArgumentException("Maximum uncompressed payload size is 128KiB");
 
             int maxOutputLength = compressor.maxCompressedLength(uncompressedLength);
-            frame = BufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP);
+            frame = bufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP);
 
             int compressedLength = compressor.compress(in, in.position(), uncompressedLength, frame, HEADER_LENGTH, maxOutputLength);
 
@@ -101,18 +100,18 @@ class FrameEncoderLZ4 extends FrameEncoder
             frame.putInt(frameCrc);
             frame.position(0);
 
-            BufferPool.putUnusedPortion(frame);
+            bufferPool.putUnusedPortion(frame);
             return GlobalBufferPoolAllocator.wrap(frame);
         }
         catch (Throwable t)
         {
             if (frame != null)
-                BufferPool.put(frame);
+                bufferPool.put(frame);
             throw t;
         }
         finally
         {
-            BufferPool.put(in);
+            bufferPool.put(in);
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
index 3b29ecb..000fab7 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
@@ -68,7 +68,7 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder
         ByteBuffer frame = null;
         try
         {
-            frame = BufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP);
+            frame = bufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP);
 
             int   frameOffset = 0;
             int payloadOffset = 0;
@@ -82,19 +82,19 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder
             }
 
             frame.limit(frameOffset);
-            BufferPool.putUnusedPortion(frame);
+            bufferPool.putUnusedPortion(frame);
 
             return GlobalBufferPoolAllocator.wrap(frame);
         }
         catch (Throwable t)
         {
             if (null != frame)
-                BufferPool.put(frame);
+                bufferPool.put(frame);
             throw t;
         }
         finally
         {
-            BufferPool.put(payload);
+            bufferPool.put(payload);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
index 3bca41c..6158713 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
@@ -59,7 +59,7 @@ class FrameEncoderUnprotected extends FrameEncoder
         }
         catch (Throwable t)
         {
-            BufferPool.put(frame);
+            bufferPool.put(frame);
             throw t;
         }
     }
diff --git a/src/java/org/apache/cassandra/net/HandshakeProtocol.java b/src/java/org/apache/cassandra/net/HandshakeProtocol.java
index 47d0ec6..bfdcc2c 100644
--- a/src/java/org/apache/cassandra/net/HandshakeProtocol.java
+++ b/src/java/org/apache/cassandra/net/HandshakeProtocol.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
@@ -135,7 +135,7 @@ class HandshakeProtocol
 
         ByteBuf encode()
         {
-            ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP);
+            ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP);
             try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer))
             {
                 out.writeInt(Message.PROTOCOL_MAGIC);
@@ -347,7 +347,7 @@ class HandshakeProtocol
 
         ByteBuf encode()
         {
-            ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP);
+            ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP);
             try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer))
             {
                 out.writeInt(maxMessagingVersion);
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index e02512b..f2339eb 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -52,7 +52,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.OutboundConnectionSettings.Framing;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.streaming.async.StreamingInboundHandler;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.lang.Math.*;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -389,7 +389,7 @@ public class InboundConnectionInitiator
                 from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
             }
 
-            BufferPool.setRecycleWhenFreeForCurrentThread(false);
+            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
             pipeline.replace(this, "streamInbound", new StreamingInboundHandler(from, current_version, null));
 
             logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}",
@@ -411,7 +411,7 @@ public class InboundConnectionInitiator
             // record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over
             instance().versions.set(from, maxMessagingVersion);
 
-            BufferPool.setRecycleWhenFreeForCurrentThread(false);
+            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
             BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
             if (initiate.type == ConnectionType.LARGE_MESSAGES)
             {
diff --git a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
index b2d487f..8017854 100644
--- a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 
 import io.netty.channel.EventLoop;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 /**
  * Equivalent to {@link GlobalBufferPoolAllocator}, except explicitly using a specified
@@ -36,7 +37,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator
 
     LocalBufferPoolAllocator(EventLoop eventLoop)
     {
-        this.pool = new BufferPool.LocalPool().recycleWhenFree(false);
+        this.pool = BufferPools.forNetworking().create().recycleWhenFree(false);
         this.eventLoop = eventLoop;
     }
 
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index 5f3eced..4a5585a 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSucc
 import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.cassandra.net.MessagingService.VERSION_40;
@@ -338,7 +338,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
                 ChannelPipeline pipeline = ctx.pipeline();
                 if (result.isSuccess())
                 {
-                    BufferPool.setRecycleWhenFreeForCurrentThread(false);
+                    BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
                     if (type.isMessaging())
                     {
                         assert frameEncoder != null;
diff --git a/src/java/org/apache/cassandra/net/ShareableBytes.java b/src/java/org/apache/cassandra/net/ShareableBytes.java
index e4f2460..71c272a 100644
--- a/src/java/org/apache/cassandra/net/ShareableBytes.java
+++ b/src/java/org/apache/cassandra/net/ShareableBytes.java
@@ -20,10 +20,10 @@ package org.apache.cassandra.net;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 /**
- * A wrapper for possibly sharing portions of a single, {@link BufferPool} managed, {@link ByteBuffer};
+ * A wrapper for possibly sharing portions of a single, {@link BufferPools#forNetworking()} managed, {@link ByteBuffer};
  * optimised for the case where no sharing is necessary.
  *
  * When sharing is necessary, {@link #share()} method must be invoked by the owning thread
@@ -136,7 +136,7 @@ class ShareableBytes
             throw new IllegalStateException("Already released");
 
         if (count == RELEASED)
-            BufferPool.put(bytes);
+            BufferPools.forNetworking().put(bytes);
     }
 
     boolean isReleased()
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index b18f689..531b492 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -29,6 +29,8 @@ import java.util.Set;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -41,7 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.config.DatabaseDescriptor;
+
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.BufferPoolMetrics;
@@ -54,10 +56,52 @@ import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
 import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
 
 /**
- * A pool of ByteBuffers that can be recycled.
+ * A pool of ByteBuffers that can be recycled to reduce system direct memory fragmentation and improve buffer allocation
+ * performance.
+ * <p/>
+ *
+ * Each {@link BufferPool} instance has one {@link GlobalPool} which allocates two kinds of chunks:
+ * <ul>
+ *     <li>Macro Chunk
+ *       <ul>
+ *         <li>A memory slab that has size of MACRO_CHUNK_SIZE which is 64 * NORMAL_CHUNK_SIZE</li>
+ *         <li>Used to allocate normal chunk with size of NORMAL_CHUNK_SIZE</li>
+ *       </ul>
+ *     </li>
+ *     <li>Normal Chunk
+ *       <ul>
+ *         <li>Used by {@link LocalPool} to serve buffer allocation</li>
+ *         <li>Minimum allocation unit is NORMAL_CHUNK_SIZE / 64</li>
+ *       </ul>
+ *     </li>
+ * </ul>
+ *
+ * {@link GlobalPool} maintains two kinds of freed chunks, fully freed chunks where all buffers are released, and
+ * partially freed chunks where some buffers are not released, eg. held by {@link org.apache.cassandra.cache.ChunkCache}.
+ * Partially freed chunks are used to improve cache utilization and have lower priority compared to fully freed chunks.
+ *
+ * <p/>
+ *
+ * {@link LocalPool} is a thread local pool to serve buffer allocation requests. There are two kinds of local pool:
+ * <ul>
+ *     <li>Normal Pool:
+ *       <ul>
+ *         <li>used to serve allocation size that is larger than half of NORMAL_ALLOCATION_UNIT but less than NORMAL_CHUNK_SIZE</li>
+ *         <li>when there is insufficient space in the local queue, it will request global pool for more normal chunks</li>
+ *         <li>when normal chunk is recycled either fully or partially, it will be passed to global pool to be used by other pools</li>
+ *       </ul>
+ *     </li>
+ *     <li>Tiny Pool:
+ *       <ul>
+ *         <li>used to serve allocation size that is less than NORMAL_ALLOCATION_UNIT</li>
+ *         <li>when there is insufficient space in the local queue, it will request parent normal pool for more tiny chunks</li>
+ *         <li>when tiny chunk is fully freed, it will be passed to paretn normal pool and corresponding buffer in the parent normal chunk is freed</li>
+ *       </ul>
+ *     </li>
+ * </ul>
  *
- * TODO: document the semantics of this class carefully
- * Notably: we do not automatically release from the local pool any chunk that has been incompletely allocated from
+ * Note: even though partially freed chunks improves cache utilization when chunk cache holds outstanding buffer for
+ * arbitrary period, there is still fragmentation in the partially freed chunk because of non-uniform allocation size.
  */
 public class BufferPool
 {
@@ -68,23 +112,40 @@ public class BufferPool
     public static final int TINY_ALLOCATION_UNIT = TINY_CHUNK_SIZE / 64;
     public static final int TINY_ALLOCATION_LIMIT = TINY_CHUNK_SIZE / 2;
 
-    private final static BufferPoolMetrics metrics = new BufferPoolMetrics();
-
-    // TODO: this should not be using FileCacheSizeInMB
-    private static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
-    private static String READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD);
-
-    private static Debug debug;
-
     private static final Logger logger = LoggerFactory.getLogger(BufferPool.class);
     private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
 
+    private volatile Debug debug = Debug.NO_OP;
+
+    protected final String name;
+    protected final BufferPoolMetrics metrics;
+    private final long memoryUsageThreshold;
+    private final String readableMemoryUsageThreshold;
+
+    /**
+     * Size of unpooled buffer being allocated outside of buffer pool in bytes.
+     */
+    private final LongAdder overflowMemoryUsage = new LongAdder();
+
+    /**
+     * Size of buffer being used in bytes, including pooled buffer and unpooled buffer.
+     */
+    private final LongAdder memoryInUse = new LongAdder();
+
+    /**
+     * Size of allocated buffer pool slabs in bytes
+     */
+    private final AtomicLong memoryAllocated = new AtomicLong();
+
     /** A global pool of chunks (page aligned buffers) */
-    private static final GlobalPool globalPool = new GlobalPool();
+    private final GlobalPool globalPool;
+
+    /** Allow partially freed chunk to be recycled for allocation*/
+    private final boolean recyclePartially;
 
     /** A thread local pool of chunks, where chunks come from the global pool */
-    private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>()
+    private final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>()
     {
         @Override
         protected LocalPool initialValue()
@@ -98,7 +159,31 @@ public class BufferPool
         }
     };
 
-    public static ByteBuffer get(int size, BufferType bufferType)
+    private final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
+    private final InfiniteLoopExecutor localPoolCleaner;
+
+    public BufferPool(String name, long memoryUsageThreshold, boolean recyclePartially)
+    {
+        this.name = name;
+        this.memoryUsageThreshold = memoryUsageThreshold;
+        this.readableMemoryUsageThreshold = prettyPrintMemory(memoryUsageThreshold);
+        this.globalPool = new GlobalPool();
+        this.metrics = new BufferPoolMetrics(name, this);
+        this.recyclePartially = recyclePartially;
+        this.localPoolCleaner = new InfiniteLoopExecutor("LocalPool-Cleaner-" + name, this::cleanupOneReference).start();
+    }
+
+    /**
+     * @return a local pool instance and caller is responsible to release the pool
+     */
+    public LocalPool create()
+    {
+        return new LocalPool();
+    }
+
+    public ByteBuffer get(int size, BufferType bufferType)
     {
         if (bufferType == BufferType.ON_HEAP)
             return allocate(size, bufferType);
@@ -106,7 +191,7 @@ public class BufferPool
             return localPool.get().get(size);
     }
 
-    public static ByteBuffer getAtLeast(int size, BufferType bufferType)
+    public ByteBuffer getAtLeast(int size, BufferType bufferType)
     {
         if (bufferType == BufferType.ON_HEAP)
             return allocate(size, bufferType);
@@ -115,30 +200,33 @@ public class BufferPool
     }
 
     /** Unlike the get methods, this will return null if the pool is exhausted */
-    public static ByteBuffer tryGet(int size)
+    public ByteBuffer tryGet(int size)
     {
         return localPool.get().tryGet(size, false);
     }
 
-    public static ByteBuffer tryGetAtLeast(int size)
+    public ByteBuffer tryGetAtLeast(int size)
     {
         return localPool.get().tryGet(size, true);
     }
 
-    private static ByteBuffer allocate(int size, BufferType bufferType)
+    private ByteBuffer allocate(int size, BufferType bufferType)
     {
+        updateOverflowMemoryUsage(size);
         return bufferType == BufferType.ON_HEAP
                ? ByteBuffer.allocate(size)
                : ByteBuffer.allocateDirect(size);
     }
 
-    public static void put(ByteBuffer buffer)
+    public void put(ByteBuffer buffer)
     {
         if (isExactlyDirect(buffer))
             localPool.get().put(buffer);
+        else
+            updateOverflowMemoryUsage(-buffer.capacity());
     }
 
-    public static void putUnusedPortion(ByteBuffer buffer)
+    public void putUnusedPortion(ByteBuffer buffer)
     {
         if (isExactlyDirect(buffer))
         {
@@ -150,43 +238,93 @@ public class BufferPool
         }
     }
 
-    public static void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree)
+    private void updateOverflowMemoryUsage(int size)
+    {
+        overflowMemoryUsage.add(size);
+    }
+
+    public void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree)
     {
         localPool.get().recycleWhenFree(recycleWhenFree);
     }
 
-    public static long sizeInBytes()
+    /**
+     * @return buffer size being allocated, including pooled buffers and unpooled buffers
+     */
+    public long sizeInBytes()
     {
-        return globalPool.sizeInBytes();
+        return memoryAllocated.get() + overflowMemoryUsage.longValue();
     }
 
-    @VisibleForTesting
-    public static void setMemoryUsageThreshold(long threshold)
+    /**
+     * @return buffer size being used, including used pooled buffers and unpooled buffers
+     */
+    public long usedSizeInBytes()
     {
-        MEMORY_USAGE_THRESHOLD = threshold;
-        READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD);
+        return memoryInUse.longValue() + overflowMemoryUsage.longValue();
+    }
+
+    /**
+     * @return unpooled buffer size being allocated outside of buffer pool.
+     */
+    public long overflowMemoryInBytes()
+    {
+        return overflowMemoryUsage.longValue();
+    }
+
+    /**
+     * @return maximum pooled buffer size in bytes
+     */
+    public long memoryUsageThreshold()
+    {
+        return memoryUsageThreshold;
     }
 
     @VisibleForTesting
-    public static long getMemoryUsageThreshold()
+    public GlobalPool globalPool()
     {
-        return MEMORY_USAGE_THRESHOLD;
+        return globalPool;
     }
 
     interface Debug
     {
+        public static Debug NO_OP = new Debug()
+        {
+            @Override
+            public void registerNormal(Chunk chunk) {}
+            @Override
+            public void recycleNormal(Chunk oldVersion, Chunk newVersion) {}
+            @Override
+            public void recyclePartial(Chunk chunk) { }
+        };
+
         void registerNormal(Chunk chunk);
         void recycleNormal(Chunk oldVersion, Chunk newVersion);
+        void recyclePartial(Chunk chunk);
     }
 
-    public static void debug(Debug setDebug)
+    public void debug(Debug setDebug)
     {
-        debug = setDebug;
+        assert setDebug != null;
+        this.debug = setDebug;
     }
 
     interface Recycler
     {
+        /**
+         * Recycle a fully freed chunk
+         */
         void recycle(Chunk chunk);
+
+        /**
+         * @return true if chunk can be reused before fully freed.
+         */
+        boolean canRecyclePartially();
+
+        /**
+         * Recycle a partially freed chunk
+         */
+        void recyclePartially(Chunk chunk);
     }
 
     /**
@@ -196,30 +334,32 @@ public class BufferPool
      *
      * This class is shared by multiple thread local pools and must be thread-safe.
      */
-    static final class GlobalPool implements Supplier<Chunk>, Recycler
+    final class GlobalPool implements Supplier<Chunk>, Recycler
     {
         /** The size of a bigger chunk, 1 MiB, must be a multiple of NORMAL_CHUNK_SIZE */
         static final int MACRO_CHUNK_SIZE = 64 * NORMAL_CHUNK_SIZE;
-        private static final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE);
-
-        static
-        {
-            assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2
-            assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
-            assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple
-
-            logger.info("Global buffer pool limit is {}", prettyPrintMemory(MEMORY_USAGE_THRESHOLD));
-        }
+        private final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE);
 
         private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>();
         // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage
+        // It contains fully free chunks and when it runs out, partially freed chunks will be used.
         private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>();
-        private final AtomicLong memoryUsage = new AtomicLong();
+        // Partially freed chunk which is recirculated whenever chunk has free spaces to
+        // improve buffer utilization when chunk cache is holding a piece of buffer for a long period.
+        // Note: fragmentation still exists, as holes are with different sizes.
+        private final Queue<Chunk> partiallyFreedChunks = new ConcurrentLinkedQueue<>();
 
         /** Used in logging statements to lazily build a human-readable current memory usage. */
-        private final Object readableMemoryUsage = 
+        private final Object readableMemoryUsage =
             new Object() { @Override public String toString() { return prettyPrintMemory(sizeInBytes()); } };
 
+        public GlobalPool()
+        {
+            assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2
+            assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
+            assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple
+        }
+
         /** Return a chunk, the caller will take owership of the parent chunk. */
         public Chunk get()
         {
@@ -232,7 +372,10 @@ public class BufferPool
                 return chunk;
 
             // another thread may have just allocated last macro chunk, so make one final attempt before returning null
-            return chunks.poll();
+            chunk = chunks.poll();
+
+            // try to use partially freed chunk if there is no more fully freed chunk.
+            return chunk == null ? partiallyFreedChunks.poll() : chunk;
         }
 
         /**
@@ -243,17 +386,17 @@ public class BufferPool
         {
             while (true)
             {
-                long cur = memoryUsage.get();
-                if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD)
+                long cur = memoryAllocated.get();
+                if (cur + MACRO_CHUNK_SIZE > memoryUsageThreshold)
                 {
-                    if (MEMORY_USAGE_THRESHOLD > 0)
+                    if (memoryUsageThreshold > 0)
                     {
                         noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}",
-                                          READABLE_MEMORY_USAGE_THRESHOLD, READABLE_MACRO_CHUNK_SIZE);
+                                          readableMemoryUsageThreshold, READABLE_MACRO_CHUNK_SIZE);
                     }
                     return null;
                 }
-                if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
+                if (memoryAllocated.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
                     break;
             }
 
@@ -265,10 +408,10 @@ public class BufferPool
             }
             catch (OutOfMemoryError oom)
             {
-                noSpamLogger.error("Buffer pool failed to allocate chunk of {}, current size {} ({}). " +
+                noSpamLogger.error("{} buffer pool failed to allocate chunk of {}, current size {} ({}). " +
                                    "Attempting to continue; buffers will be allocated in on-heap memory which can degrade performance. " +
                                    "Make sure direct memory size (-XX:MaxDirectMemorySize) is large enough to accommodate off-heap memtables and caches.",
-                                   READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName());
+                                   name, READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName());
                 return null;
             }
 
@@ -276,29 +419,35 @@ public class BufferPool
             macroChunks.add(chunk);
 
             final Chunk callerChunk = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE));
-            if (debug != null)
-                debug.registerNormal(callerChunk);
+            debug.registerNormal(callerChunk);
             for (int i = NORMAL_CHUNK_SIZE; i < MACRO_CHUNK_SIZE; i += NORMAL_CHUNK_SIZE)
             {
                 Chunk add = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE));
                 chunks.add(add);
-                if (debug != null)
-                    debug.registerNormal(add);
+                debug.registerNormal(add);
             }
             return callerChunk;
         }
 
+        @Override
         public void recycle(Chunk chunk)
         {
             Chunk recycleAs = new Chunk(chunk);
-            if (debug != null)
-                debug.recycleNormal(chunk, recycleAs);
+            debug.recycleNormal(chunk, recycleAs);
             chunks.add(recycleAs);
         }
 
-        public long sizeInBytes()
+        @Override
+        public void recyclePartially(Chunk chunk)
         {
-            return memoryUsage.get();
+            debug.recyclePartial(chunk);
+            partiallyFreedChunks.add(chunk);
+        }
+
+        @Override
+        public boolean canRecyclePartially()
+        {
+            return recyclePartially;
         }
 
         /** This is not thread safe and should only be used for unit testing. */
@@ -308,10 +457,23 @@ public class BufferPool
             while (!chunks.isEmpty())
                 chunks.poll().unsafeFree();
 
+            while (!partiallyFreedChunks.isEmpty())
+                partiallyFreedChunks.poll().unsafeFree();
+
             while (!macroChunks.isEmpty())
                 macroChunks.poll().unsafeFree();
+        }
+
+        @VisibleForTesting
+        boolean isPartiallyFreed(Chunk chunk)
+        {
+            return partiallyFreedChunks.contains(chunk);
+        }
 
-            memoryUsage.set(0);
+        @VisibleForTesting
+        boolean isFullyFreed(Chunk chunk)
+        {
+            return chunks.contains(chunk);
         }
     }
 
@@ -545,7 +707,7 @@ public class BufferPool
      * A thread local class that grabs chunks from the global pool for this thread allocations.
      * Only one thread can do the allocations but multiple threads can release the allocations.
      */
-    public static final class LocalPool implements Recycler
+    public final class LocalPool implements Recycler
     {
         private final Queue<ByteBuffer> reuseObjects;
         private final Supplier<Chunk> parent;
@@ -575,9 +737,7 @@ public class BufferPool
         {
             this.parent = () -> {
                 ByteBuffer buffer = parent.tryGetInternal(TINY_CHUNK_SIZE, false);
-                if (buffer == null)
-                    return null;
-                return new Chunk(parent, buffer);
+                return buffer == null ? null : new Chunk(parent, buffer);
             };
             this.tinyLimit = 0; // we only currently permit one layer of nesting (which brings us down to 32 byte allocations, so is plenty)
             this.reuseObjects = parent.reuseObjects; // we share the same ByteBuffer object reuse pool, as we both have the same exclusive access to it
@@ -594,13 +754,21 @@ public class BufferPool
         public void put(ByteBuffer buffer)
         {
             Chunk chunk = Chunk.getParentChunk(buffer);
+            int size = buffer.capacity();
+
             if (chunk == null)
+            {
                 FileUtils.clean(buffer);
+                updateOverflowMemoryUsage(-size);
+            }
             else
+            {
                 put(buffer, chunk);
+                memoryInUse.add(-size);
+            }
         }
 
-        public void put(ByteBuffer buffer, Chunk chunk)
+        private void put(ByteBuffer buffer, Chunk chunk)
         {
             LocalPool owner = chunk.owner;
             if (owner != null && owner == tinyPool)
@@ -609,23 +777,39 @@ public class BufferPool
                 return;
             }
 
-            // ask the free method to take exclusive ownership of the act of recycling
-            // if we are either: already not owned by anyone, or owned by ourselves
-            long free = chunk.free(buffer, owner == null || (owner == this && recycleWhenFree));
+            // ask the free method to take exclusive ownership of the act of recycling if chunk is owned by ourselves
+            long free = chunk.free(buffer, owner == this && recycleWhenFree);
+            // free:
+            // *     0L: current pool must be the owner. we can fully recyle the chunk.
+            // *    -1L:
+            //          * for normal chunk:
+            //              a) if it has owner, do nothing.
+            //              b) if it not owner, try to recyle it either fully or partially if not already recyled.
+            //          * for tiny chunk:
+            //              a) if it has owner, do nothing.
+            //              b) if it has not owner, recycle the tiny chunk back to parent chunk
+            // * others:
+            //          * for normal chunk:  partial recycle the chunk if it can be partially recycled but not yet recycled.
+            //          * for tiny chunk: do nothing.
             if (free == 0L)
             {
+                assert owner == this;
                 // 0L => we own recycling responsibility, so must recycle;
-                // if we are the owner, we must remove the Chunk from our local queue
-                if (owner == this)
-                    remove(chunk);
+                // We must remove the Chunk from our local queue
+                remove(chunk);
                 chunk.recycle();
             }
-            else if (((free == -1L) && owner != this) && chunk.owner == null)
+            else if (free == -1L && owner != this && chunk.owner == null && !chunk.recycler.canRecyclePartially())
             {
                 // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset.
                 // we must also check after completely freeing if the owner has since been unset, and try to recycle
                 chunk.tryRecycle();
             }
+            else if (chunk.owner == null && chunk.recycler.canRecyclePartially() && chunk.setInUse(Chunk.Status.EVICTED))
+            {
+                // re-cirlate partially freed normal chunk to global list
+                chunk.partiallyRecycle();
+            }
 
             if (owner == this)
             {
@@ -638,10 +822,16 @@ public class BufferPool
         public void putUnusedPortion(ByteBuffer buffer)
         {
             Chunk chunk = Chunk.getParentChunk(buffer);
+            int size = buffer.capacity() - buffer.limit();
+
             if (chunk == null)
+            {
+                updateOverflowMemoryUsage(-size);
                 return;
+            }
 
             chunk.freeUnusedPortion(buffer);
+            memoryInUse.add(-size);
         }
 
         public ByteBuffer get(int size)
@@ -658,7 +848,11 @@ public class BufferPool
         {
             ByteBuffer ret = tryGet(size, sizeIsLowerBound);
             if (ret != null)
+            {
+                metrics.hits.mark();
+                memoryInUse.add(ret.capacity());
                 return ret;
+            }
 
             if (size > NORMAL_CHUNK_SIZE)
             {
@@ -677,16 +871,6 @@ public class BufferPool
             return allocate(size, BufferType.OFF_HEAP);
         }
 
-        public ByteBuffer tryGet(int size)
-        {
-            return tryGet(size, false);
-        }
-
-        public ByteBuffer tryGetAtLeast(int size)
-        {
-            return tryGet(size, true);
-        }
-
         private ByteBuffer tryGet(int size, boolean sizeIsLowerBound)
         {
             LocalPool pool = this;
@@ -715,7 +899,9 @@ public class BufferPool
             ByteBuffer reuse = this.reuseObjects.poll();
             ByteBuffer buffer = chunks.get(size, sizeIsLowerBound, reuse);
             if (buffer != null)
+            {
                 return buffer;
+            }
 
             // else ask the global pool
             Chunk chunk = addChunkFromParent();
@@ -731,7 +917,8 @@ public class BufferPool
             return null;
         }
 
-        // recycle
+        // recycle entire tiny chunk from tiny pool back to local pool
+        @Override
         public void recycle(Chunk chunk)
         {
             ByteBuffer buffer = chunk.slab;
@@ -739,6 +926,20 @@ public class BufferPool
             put(buffer, parentChunk);
         }
 
+        @Override
+        public void recyclePartially(Chunk chunk)
+        {
+            throw new UnsupportedOperationException("Tiny chunk doesn't support partial recycle.");
+        }
+
+        @Override
+        public boolean canRecyclePartially()
+        {
+            // tiny pool doesn't support partial recycle, as we want to have tiny chunk fully freed and put back to
+            // parent normal chunk.
+            return false;
+        }
+
         private void remove(Chunk chunk)
         {
             chunks.remove(chunk);
@@ -763,8 +964,11 @@ public class BufferPool
             if (evict != null)
             {
                 if (tinyPool != null)
+                    // releasing tiny chunks may result in releasing current evicted chunk
                     tinyPool.chunks.removeIf((child, parent) -> Chunk.getParentChunk(child.slab) == parent, evict);
                 evict.release();
+                // Mark it as evicted and will be eligible for partial recyle if recycler allows
+                evict.setEvicted(Chunk.Status.IN_USE);
             }
         }
 
@@ -784,6 +988,12 @@ public class BufferPool
             chunks.unsafeRecycle();
         }
 
+        @VisibleForTesting
+        public boolean isTinyPool()
+        {
+            return !(parent instanceof GlobalPool);
+        }
+
         public LocalPool recycleWhenFree(boolean recycleWhenFree)
         {
             this.recycleWhenFree = recycleWhenFree;
@@ -808,12 +1018,7 @@ public class BufferPool
         }
     }
 
-    private static final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-    private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
-    private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
-
-    private static void cleanupOneReference() throws InterruptedException
+    private void cleanupOneReference() throws InterruptedException
     {
         Object obj = localPoolRefQueue.remove(100);
         if (obj instanceof LocalPoolRef)
@@ -865,10 +1070,19 @@ public class BufferPool
      */
     final static class Chunk
     {
+        enum Status
+        {
+            /** The slab is serving or ready to serve requests */
+            IN_USE,
+            /** The slab is not serving requests and ready for partial recycle*/
+            EVICTED;
+        }
+
         private final ByteBuffer slab;
-        private final long baseAddress;
+        final long baseAddress;
         private final int shift;
 
+        // it may be 0L when all slots are allocated after "get" or when all slots are freed after "free"
         private volatile long freeSlots;
         private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots");
 
@@ -878,6 +1092,10 @@ public class BufferPool
         private volatile LocalPool owner;
         private final Recycler recycler;
 
+        private static final AtomicReferenceFieldUpdater<Chunk, Status> statusUpdater =
+                AtomicReferenceFieldUpdater.newUpdater(Chunk.class, Status.class, "status");
+        private volatile Status status = Status.IN_USE;
+
         @VisibleForTesting
         Object debugAttachment;
 
@@ -939,6 +1157,12 @@ public class BufferPool
             recycler.recycle(this);
         }
 
+        public void partiallyRecycle()
+        {
+            assert owner == null;
+            recycler.recyclePartially(this);
+        }
+
         /**
          * We stash the chunk in the attachment of a buffer
          * that was returned by get(), this method simply
@@ -1182,6 +1406,12 @@ public class BufferPool
         }
 
         @VisibleForTesting
+        public LocalPool owner()
+        {
+            return this.owner;
+        }
+
+        @VisibleForTesting
         void unsafeFree()
         {
             Chunk parent = getParentChunk(slab);
@@ -1200,6 +1430,26 @@ public class BufferPool
                 chunk.recycle();
             }
         }
+
+        Status status()
+        {
+            return status;
+        }
+
+        private boolean setStatus(Status current, Status update)
+        {
+            return statusUpdater.compareAndSet(this, current, update);
+        }
+
+        boolean setInUse(Status prev)
+        {
+            return setStatus(prev, Status.IN_USE);
+        }
+
+        boolean setEvicted(Status prev)
+        {
+            return setStatus(prev, Status.EVICTED);
+        }
     }
 
     @VisibleForTesting
@@ -1218,49 +1468,41 @@ public class BufferPool
     }
 
     @VisibleForTesting
-    public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+    public void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
     {
-        shutdownNow(of(EXEC));
-        awaitTermination(timeout, unit, of(EXEC));
+        shutdownNow(of(localPoolCleaner));
+        awaitTermination(timeout, unit, of(localPoolCleaner));
     }
 
-    public static long unsafeGetBytesInUse()
+    @VisibleForTesting
+    public BufferPoolMetrics metrics()
     {
-        long totalMemory = globalPool.memoryUsage.get();
-        class L { long v; }
-        final L availableMemory = new L();
-        for (Chunk chunk : globalPool.chunks)
-        {
-            availableMemory.v += chunk.capacity();
-        }
-        for (LocalPoolRef ref : localPoolReferences)
-        {
-            ref.chunks.forEach(chunk -> availableMemory.v += chunk.free());
-        }
-        return totalMemory - availableMemory.v;
+        return metrics;
     }
 
     /** This is not thread safe and should only be used for unit testing. */
     @VisibleForTesting
-    static void unsafeReset()
+    public void unsafeReset()
     {
+        overflowMemoryUsage.reset();
+        memoryInUse.reset();
+        memoryAllocated.set(0);
         localPool.get().unsafeRecycle();
         globalPool.unsafeFree();
     }
 
     @VisibleForTesting
-    static Chunk unsafeCurrentChunk()
+    Chunk unsafeCurrentChunk()
     {
         return localPool.get().chunks.chunk0;
     }
 
     @VisibleForTesting
-    static int unsafeNumChunks()
+    int unsafeNumChunks()
     {
         LocalPool pool = localPool.get();
         return   (pool.chunks.chunk0 != null ? 1 : 0)
                  + (pool.chunks.chunk1 != null ? 1 : 0)
                  + (pool.chunks.chunk2 != null ? 1 : 0);
     }
-
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPools.java b/src/java/org/apache/cassandra/utils/memory/BufferPools.java
new file mode 100644
index 0000000..736e1cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPools.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BufferPools
+{
+    private static final Logger logger = LoggerFactory.getLogger(BufferPools.class);
+
+    /**
+     * Used by chunk cache to store decompressed data and buffers may be held by chunk cache for arbitrary period.
+     */
+    private static final long FILE_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
+    private static final BufferPool CHUNK_CACHE_POOL = new BufferPool("chunk-cache", FILE_MEMORY_USAGE_THRESHOLD, true);
+
+    /**
+     * Used by client-server or inter-node requests, buffers should be released immediately after use.
+     */
+    private static final long NETWORKING_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getNetworkingCacheSizeInMB() * 1024L * 1024L;
+    private static final BufferPool NETWORKING_POOL = new BufferPool("networking", NETWORKING_MEMORY_USAGE_THRESHOLD, false);
+
+    static
+    {
+        logger.info("Global buffer pool limit is {} for {} and {} for {}",
+                    prettyPrintMemory(FILE_MEMORY_USAGE_THRESHOLD),
+                    CHUNK_CACHE_POOL.name,
+                    prettyPrintMemory(NETWORKING_MEMORY_USAGE_THRESHOLD),
+                    NETWORKING_POOL.name);
+
+        CHUNK_CACHE_POOL.metrics().register3xAlias();
+    }
+    /**
+     * Long-lived buffers used for chunk cache and other disk access
+     */
+    public static BufferPool forChunkCache()
+    {
+        return CHUNK_CACHE_POOL;
+    }
+
+    /**
+     * Short-lived buffers used for internode messaging or client-server connections.
+     */
+    public static BufferPool forNetworking()
+    {
+        return NETWORKING_POOL;
+    }
+
+    public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException
+    {
+        CHUNK_CACHE_POOL.shutdownLocalCleaner(timeout, unit);
+        NETWORKING_POOL.shutdownLocalCleaner(timeout, unit);
+    }
+
+}
diff --git a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
index a421f3e..dea552c 100644
--- a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
+++ b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
@@ -59,7 +59,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageGenerator.UniformPayloadGenerator;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.MonotonicClock;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.lang.Math.min;
 import static org.apache.cassandra.net.MessagingService.current_version;
@@ -435,7 +435,7 @@ public class ConnectionBurnTest
                                         checkStoppedTo  .accept(endpoint, getConnections(endpoint, true ));
                                         checkStoppedFrom.accept(endpoint, getConnections(endpoint, false));
                                     }
-                                    long inUse = BufferPool.unsafeGetBytesInUse();
+                                    long inUse = BufferPools.forNetworking().usedSizeInBytes();
                                     if (inUse > 0)
                                     {
 //                                        try
diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
index 0af4199..fc603c9 100644
--- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -84,7 +85,7 @@ public class LongBufferPoolTest
         }
         long recycleRound = 1;
         final List<BufferPool.Chunk> normalChunks = new ArrayList<>();
-        final List<BufferPool.Chunk> tinyChunks = new ArrayList<>();
+
         public synchronized void registerNormal(BufferPool.Chunk chunk)
         {
             chunk.debugAttachment = new DebugChunk();
@@ -95,13 +96,14 @@ public class LongBufferPoolTest
             newVersion.debugAttachment = oldVersion.debugAttachment;
             DebugChunk.get(oldVersion).lastRecycled = recycleRound;
         }
+        public void recyclePartial(BufferPool.Chunk chunk)
+        {
+            DebugChunk.get(chunk).lastRecycled = recycleRound;
+        }
         public synchronized void check()
         {
-//            for (BufferPool.Chunk chunk : tinyChunks)
-//                assert DebugChunk.get(chunk).lastRecycled == recycleRound;
             for (BufferPool.Chunk chunk : normalChunks)
                 assert DebugChunk.get(chunk).lastRecycled == recycleRound;
-            tinyChunks.clear(); // they don't survive a recycleRound
             recycleRound++;
         }
     }
@@ -112,10 +114,29 @@ public class LongBufferPoolTest
         DatabaseDescriptor.daemonInitialization();
     }
 
+    @AfterClass
+    public static void teardown()
+    {
+        BufferPools.forChunkCache().unsafeReset();
+        BufferPools.forNetworking().unsafeReset();
+    }
+
     @Test
-    public void testAllocate() throws InterruptedException, ExecutionException
+    public void testPoolAllocateWithRecyclePartially() throws InterruptedException, ExecutionException
+    {
+        testPoolAllocate(true);
+    }
+
+    @Test
+    public void testPoolAllocateWithoutRecyclePartially() throws InterruptedException, ExecutionException
+    {
+        testPoolAllocate(false);
+    }
+
+    private void testPoolAllocate(boolean recyclePartially) throws InterruptedException, ExecutionException
     {
-        testAllocate(Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L), 16 << 20);
+        BufferPool pool = new BufferPool("test_pool", 16 << 20, recyclePartially);
+        testAllocate(pool, Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L));
     }
 
     private static final class BufferCheck
@@ -160,11 +181,11 @@ public class LongBufferPoolTest
         final List<Future<Boolean>> threadResultFuture;
         final int targetSizeQuanta;
 
-        TestEnvironment(int threadCount, long duration, int poolSize)
+        TestEnvironment(int threadCount, long duration, long poolSize)
         {
             this.threadCount = threadCount;
             this.duration = duration;
-            this.poolSize = poolSize;
+            this.poolSize = Math.toIntExact(poolSize);
             until = System.nanoTime() + duration;
             latch = new CountDownLatch(threadCount);
             sharedRecycle = new SPSCQueue[threadCount];
@@ -188,7 +209,7 @@ public class LongBufferPoolTest
             //
             // This should divide double the poolSize across the working threads,
             // plus NORMAL_CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair.
-            targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1);
+            targetSizeQuanta = 2 * this.poolSize / sum1toN(threadCount - 1);
         }
 
         void addCheckedFuture(Future<Boolean> future)
@@ -238,24 +259,19 @@ public class LongBufferPoolTest
         }
     }
 
-    public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException
+    public void testAllocate(BufferPool bufferPool, int threadCount, long duration) throws InterruptedException, ExecutionException
     {
-        System.out.println(String.format("%s - testing %d threads for %dm",
-                                         DATE_FORMAT.format(new Date()),
-                                         threadCount,
-                                         TimeUnit.NANOSECONDS.toMinutes(duration)));
-        long prevPoolSize = BufferPool.getMemoryUsageThreshold();
-        logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize);
-        BufferPool.setMemoryUsageThreshold(poolSize);
+        logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration));
+        logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold());
         Debug debug = new Debug();
-        BufferPool.debug(debug);
+        bufferPool.debug(debug);
 
-        TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize);
+        TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold());
 
-        startBurnerThreads(testEnv);
+        startBurnerThreads(bufferPool, testEnv);
 
         for (int threadIdx = 0; threadIdx < threadCount; threadIdx++)
-            testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx));
+            testEnv.addCheckedFuture(startWorkerThread(bufferPool, testEnv, threadIdx));
 
         while (!testEnv.latch.await(10L, TimeUnit.SECONDS))
         {
@@ -281,25 +297,23 @@ public class LongBufferPoolTest
             while ( null != (check = queue.poll()) )
             {
                 check.validate();
-                BufferPool.put(check.buffer);
+                bufferPool.put(check.buffer);
             }
         }
 
         assertEquals(0, testEnv.executorService.shutdownNow().size());
 
-        logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize);
-        BufferPool.setMemoryUsageThreshold(prevPoolSize);
-        BufferPool.debug(null);
+        logger.info("Reverting BufferPool DEBUG config");
+        bufferPool.debug(BufferPool.Debug.NO_OP);
 
         testEnv.assertCheckedThreadsSucceeded();
 
-        System.out.println(String.format("%s - finished.",
-                                         DATE_FORMAT.format(new Date())));
+        logger.info("{} - finished.", DATE_FORMAT.format(new Date()));
     }
 
-    private Future<Boolean> startWorkerThread(TestEnvironment testEnv, final int threadIdx)
+    private Future<Boolean> startWorkerThread(BufferPool bufferPool, TestEnvironment testEnv, final int threadIdx)
     {
-        return testEnv.executorService.submit(new TestUntil(testEnv.until)
+        return testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
         {
             final int targetSize = threadIdx == 0 ? BufferPool.NORMAL_CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx;
             final SPSCQueue<BufferCheck> shareFrom = testEnv.sharedRecycle[threadIdx];
@@ -362,7 +376,7 @@ public class LongBufferPoolTest
                     else
                     {
                         check.validate();
-                        BufferPool.put(check.buffer);
+                        bufferPool.put(check.buffer);
                         totalSize -= size;
                     }
                 }
@@ -407,7 +421,7 @@ public class LongBufferPoolTest
                 while (checks.size() > 0)
                 {
                     BufferCheck check = checks.get(0);
-                    BufferPool.put(check.buffer);
+                    bufferPool.put(check.buffer);
                     checks.remove(check.listnode);
                 }
                 testEnv.latch.countDown();
@@ -419,13 +433,13 @@ public class LongBufferPoolTest
                 if (check == null)
                     return false;
                 check.validate();
-                BufferPool.put(check.buffer);
+                bufferPool.put(check.buffer);
                 return true;
             }
 
             BufferCheck allocate(int size)
             {
-                ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+                ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
                 assertNotNull(buffer);
                 BufferCheck check = new BufferCheck(buffer, rand.nextLong());
                 assertEquals(size, buffer.capacity());
@@ -470,12 +484,12 @@ public class LongBufferPoolTest
         });
     }
 
-    private void startBurnerThreads(TestEnvironment testEnv)
+    private void startBurnerThreads(BufferPool bufferPool, TestEnvironment testEnv)
     {
         // setup some high churn allocate/deallocate, without any checking
         final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
         final CountDownLatch doneAdd = new CountDownLatch(1);
-        testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(testEnv.until)
+        testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
         {
             int count = 0;
             final ThreadLocalRandom rand = ThreadLocalRandom.current();
@@ -494,8 +508,9 @@ public class LongBufferPoolTest
                     return;
                 }
 
-                ByteBuffer buffer = rand.nextInt(4) < 1 ? BufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE)
-                                                        : BufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT);
+                ByteBuffer buffer = rand.nextInt(4) < 1
+                        ? bufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE)
+                        : bufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT);
                 if (buffer == null)
                 {
                     Thread.yield();
@@ -505,7 +520,7 @@ public class LongBufferPoolTest
                 // 50/50 chance of returning the buffer from the producer thread, or
                 // pass it on to the consumer.
                 if (rand.nextBoolean())
-                    BufferPool.put(buffer);
+                    bufferPool.put(buffer);
                 else
                     burn.add(buffer);
 
@@ -516,7 +531,7 @@ public class LongBufferPoolTest
                 doneAdd.countDown();
             }
         }));
-        testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(testEnv.until)
+        testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
         {
             void testOne() throws Exception
             {
@@ -526,7 +541,7 @@ public class LongBufferPoolTest
                     Thread.yield();
                     return;
                 }
-                BufferPool.put(buffer);
+                bufferPool.put(buffer);
             }
             void cleanup()
             {
@@ -537,9 +552,11 @@ public class LongBufferPoolTest
 
     static abstract class TestUntil implements Callable<Boolean>
     {
+        final BufferPool bufferPool;
         final long until;
-        protected TestUntil(long until)
+        protected TestUntil(BufferPool bufferPool, long until)
         {
+            this.bufferPool = bufferPool;
             this.until = until;
         }
 
@@ -562,7 +579,7 @@ public class LongBufferPoolTest
             {
                 logger.error("Got exception {}, current chunk {}",
                              ex.getMessage(),
-                             BufferPool.unsafeCurrentChunk());
+                             bufferPool.unsafeCurrentChunk());
                 ex.printStackTrace();
                 return false;
             }
@@ -570,7 +587,7 @@ public class LongBufferPoolTest
             {
                 logger.error("Got throwable {}, current chunk {}",
                              tr.getMessage(),
-                             BufferPool.unsafeCurrentChunk());
+                             bufferPool.unsafeCurrentChunk());
                 tr.printStackTrace();
                 return false;
             }
@@ -587,14 +604,14 @@ public class LongBufferPoolTest
         try
         {
             LongBufferPoolTest.setup();
-            new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(),
-                                                  TimeUnit.HOURS.toNanos(2L), 16 << 20);
+            new LongBufferPoolTest().testAllocate(new BufferPool("test_pool", 16 << 20, true),
+                                                  Runtime.getRuntime().availableProcessors(),
+                                                  TimeUnit.HOURS.toNanos(2L));
             System.exit(0);
         }
         catch (Throwable tr)
         {
-            System.out.println(String.format("Test failed - %s", tr.getMessage()));
-            tr.printStackTrace();
+            logger.error("Test failed - {}", tr.getMessage(), tr);
             System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure
         }
     }
diff --git a/test/data/jmxdump/cassandra-4.0-jmx.yaml b/test/data/jmxdump/cassandra-4.0-jmx.yaml
index a013ebc..05f4773 100644
--- a/test/data/jmxdump/cassandra-4.0-jmx.yaml
+++ b/test/data/jmxdump/cassandra-4.0-jmx.yaml
@@ -7155,7 +7155,7 @@ org.apache.cassandra.internal:type=ViewBuildExecutor:
   - {access: read/write, name: MaximumPoolSize, type: int}
   - {access: read/write, name: MaximumThreads, type: int}
   operations: []
-org.apache.cassandra.metrics:type=BufferPool,name=Misses:
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Misses:
   attributes:
   - {access: read-only, name: Count, type: long}
   - {access: read-only, name: FifteenMinuteRate, type: double}
@@ -7167,7 +7167,78 @@ org.apache.cassandra.metrics:type=BufferPool,name=Misses:
   - name: objectName
     parameters: []
     returnType: javax.management.ObjectName
-org.apache.cassandra.metrics:type=BufferPool,name=Size:
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Hits:
+  attributes:
+  - {access: read-only, name: Count, type: long}
+  - {access: read-only, name: FifteenMinuteRate, type: double}
+  - {access: read-only, name: FiveMinuteRate, type: double}
+  - {access: read-only, name: MeanRate, type: double}
+  - {access: read-only, name: OneMinuteRate, type: double}
+  - {access: read-only, name: RateUnit, type: java.lang.String}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Size:
+  attributes:
+  - {access: read-only, name: Value, type: java.lang.Object}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=UsedSize:
+  attributes:
+  - {access: read-only, name: Value, type: java.lang.Object}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=OverflowSize:
+  attributes:
+  - {access: read-only, name: Value, type: java.lang.Object}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Misses:
+  attributes:
+  - {access: read-only, name: Count, type: long}
+  - {access: read-only, name: FifteenMinuteRate, type: double}
+  - {access: read-only, name: FiveMinuteRate, type: double}
+  - {access: read-only, name: MeanRate, type: double}
+  - {access: read-only, name: OneMinuteRate, type: double}
+  - {access: read-only, name: RateUnit, type: java.lang.String}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Hits:
+  attributes:
+  - {access: read-only, name: Count, type: long}
+  - {access: read-only, name: FifteenMinuteRate, type: double}
+  - {access: read-only, name: FiveMinuteRate, type: double}
+  - {access: read-only, name: MeanRate, type: double}
+  - {access: read-only, name: OneMinuteRate, type: double}
+  - {access: read-only, name: RateUnit, type: java.lang.String}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Size:
+  attributes:
+  - {access: read-only, name: Value, type: java.lang.Object}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=UsedSize:
+  attributes:
+  - {access: read-only, name: Value, type: java.lang.Object}
+  operations:
+  - name: objectName
+    parameters: []
+    returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=OverflowSize:
   attributes:
   - {access: read-only, name: Value, type: java.lang.Object}
   operations:
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 51c58d4..9a3fd08 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -120,8 +120,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.memory.BufferPool;
 import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
+import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -617,7 +617,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
                                 () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
-                                () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
+                                () -> BufferPools.shutdownLocalCleaner(1L, MINUTES),
                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
                                 () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
diff --git a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
index 313aa3f..5e0286f 100644
--- a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.metrics;
 
 import java.util.Random;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -31,7 +30,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.memory.BufferPool;
-import org.apache.cassandra.utils.memory.BufferPoolTest;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -40,7 +38,8 @@ import static org.junit.Assert.assertEquals;
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class BufferPoolMetricsTest
 {
-    private static final BufferPoolMetrics metrics = new BufferPoolMetrics();
+    private BufferPool bufferPool;
+    private BufferPoolMetrics metrics;
 
     @BeforeClass()
     public static void setup() throws ConfigurationException
@@ -51,21 +50,15 @@ public class BufferPoolMetricsTest
     @Before
     public void setUp()
     {
-        BufferPool.setMemoryUsageThreshold(16 * 1024L * 1024L);
-    }
-
-    @After
-    public void cleanUp()
-    {
-        BufferPoolTest.resetBufferPool();
-        metrics.misses.mark(metrics.misses.getCount() * -1);
+        this.bufferPool = new BufferPool("test_" + System.currentTimeMillis(), 16 * 1024L * 1024L, true);
+        this.metrics = bufferPool.metrics();
     }
 
     @Test
     public void testMetricsSize()
     {
         // basically want to test changes in the metric being reported as the buffer pool grows - starts at zero
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isEqualTo(0);
 
         // the idea is to test changes in the sizeOfBufferPool metric which starts at zero. it will bump up
@@ -77,7 +70,7 @@ public class BufferPoolMetricsTest
         final long seed = System.currentTimeMillis();
         final Random rand = new Random(seed);
         final String assertionMessage = String.format("Failed with seed of %s", seed);
-        final long maxIterations = BufferPool.getMemoryUsageThreshold();
+        final long maxIterations = bufferPool.memoryUsageThreshold();
         final int maxBufferSize = BufferPool.NORMAL_CHUNK_SIZE - 1;
         int nextSizeToRequest;
         long totalBytesRequestedFromPool = 0;
@@ -87,15 +80,15 @@ public class BufferPoolMetricsTest
         {
             nextSizeToRequest = rand.nextInt(maxBufferSize) + 1;
             totalBytesRequestedFromPool = totalBytesRequestedFromPool + nextSizeToRequest;
-            BufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP);
+            bufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP);
 
             assertThat(metrics.size.getValue()).as(assertionMessage)
-                                               .isEqualTo(BufferPool.sizeInBytes())
+                                               .isEqualTo(bufferPool.sizeInBytes())
                                                .isGreaterThanOrEqualTo(totalBytesRequestedFromPool);
 
             if (initialSizeInBytesAfterZero == 0)
             {
-                initialSizeInBytesAfterZero = BufferPool.sizeInBytes();
+                initialSizeInBytesAfterZero = bufferPool.sizeInBytes();
             }
             else
             {
@@ -115,6 +108,74 @@ public class BufferPoolMetricsTest
     }
 
     @Test
+    public void testMetricsOverflowSize()
+    {
+        assertEquals(0, metrics.overflowSize.getValue().longValue());
+
+        final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+        final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+        int iterations = 16;
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+            assertEquals(0, metrics.overflowSize.getValue().longValue());
+        }
+
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP);
+            assertEquals(bigBufferSizeThatMisses * (ix + 1), metrics.overflowSize.getValue().longValue());
+        }
+    }
+
+    @Test
+    public void testMetricsUsedSize()
+    {
+        assertEquals(0, metrics.usedSize.getValue().longValue());
+
+        final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+        final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+        long usedSize = 0;
+        int iterations = 16;
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+            assertEquals(usedSize += tinyBufferSizeThatHits, metrics.usedSize.getValue().longValue());
+        }
+
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP);
+            assertEquals(usedSize += bigBufferSizeThatMisses, metrics.usedSize.getValue().longValue());
+        }
+    }
+
+    @Test
+    public void testMetricsHits()
+    {
+        assertEquals(0, metrics.hits.getCount());
+
+        final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+        final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+        int iterations = 16;
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+            assertEquals(ix + 1, metrics.hits.getCount());
+        }
+
+        long currentHits = metrics.hits.getCount();
+        for (int ix = 0; ix < iterations; ix++)
+        {
+            bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
+            assertEquals(currentHits, metrics.hits.getCount());
+        }
+    }
+
+    @Test
     public void testMetricsMisses()
     {
         assertEquals(0, metrics.misses.getCount());
@@ -125,13 +186,13 @@ public class BufferPoolMetricsTest
         int iterations = 16;
         for (int ix = 0; ix < iterations; ix++)
         {
-            BufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+            bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
             assertEquals(0, metrics.misses.getCount());
         }
 
         for (int ix = 0; ix < iterations; ix++)
         {
-            BufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
+            bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
             assertEquals(ix + 1, metrics.misses.getCount());
         }
     }
@@ -140,23 +201,23 @@ public class BufferPoolMetricsTest
     public void testZeroSizeRequestsDontChangeMetrics()
     {
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isEqualTo(0);
 
-        BufferPool.get(0, BufferType.OFF_HEAP);
+        bufferPool.get(0, BufferType.OFF_HEAP);
 
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isEqualTo(0);
 
-        BufferPool.get(65536, BufferType.OFF_HEAP);
-        BufferPool.get(0, BufferType.OFF_HEAP);
-        BufferPool.get(0, BufferType.OFF_HEAP);
-        BufferPool.get(0, BufferType.OFF_HEAP);
-        BufferPool.get(0, BufferType.OFF_HEAP);
+        bufferPool.get(65536, BufferType.OFF_HEAP);
+        bufferPool.get(0, BufferType.OFF_HEAP);
+        bufferPool.get(0, BufferType.OFF_HEAP);
+        bufferPool.get(0, BufferType.OFF_HEAP);
+        bufferPool.get(0, BufferType.OFF_HEAP);
 
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isGreaterThanOrEqualTo(65536);
     }
 
@@ -164,29 +225,29 @@ public class BufferPoolMetricsTest
     public void testFailedRequestsDontChangeMetrics()
     {
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isEqualTo(0);
 
         tryRequestNegativeBufferSize();
 
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isEqualTo(0);
 
-        BufferPool.get(65536, BufferType.OFF_HEAP);
+        bufferPool.get(65536, BufferType.OFF_HEAP);
         tryRequestNegativeBufferSize();
         tryRequestNegativeBufferSize();
         tryRequestNegativeBufferSize();
         tryRequestNegativeBufferSize();
 
         assertEquals(0, metrics.misses.getCount());
-        assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+        assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
                                            .isGreaterThanOrEqualTo(65536);
     }
 
     private void tryRequestNegativeBufferSize()
     {
         assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(
-        () -> BufferPool.get(-1, BufferType.OFF_HEAP));
+        () -> bufferPool.get(-1, BufferType.OFF_HEAP));
     }
 }
diff --git a/test/unit/org/apache/cassandra/net/FramingTest.java b/test/unit/org/apache/cassandra/net/FramingTest.java
index 8a7f428..27c8003 100644
--- a/test/unit/org/apache/cassandra/net/FramingTest.java
+++ b/test/unit/org/apache/cassandra/net/FramingTest.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
 import static java.lang.Math.*;
@@ -197,7 +197,7 @@ public class FramingTest
             cumulativeCompressedLength[i] = (i == 0 ? 0 : cumulativeCompressedLength[i - 1]) + buffer.readableBytes();
         }
 
-        ByteBuffer frames = BufferPool.getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP);
+        ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP);
         for (ByteBuf buffer : compressed)
         {
             frames.put(buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes()));
@@ -412,7 +412,7 @@ public class FramingTest
             cumulativeLength[i] = (i == 0 ? 0 : cumulativeLength[i - 1]) + message.length;
         }
 
-        ByteBuffer frames = BufferPool.getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP);
+        ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP);
         for (byte[] buffer : messages)
             frames.put(buffer);
         frames.flip();
diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
index 4bfc54a..eb3cc1b 100644
--- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
@@ -23,13 +23,11 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 
-import org.junit.After;
+import com.google.common.collect.Iterables;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.RandomAccessReader;
 
@@ -37,31 +35,12 @@ import static org.junit.Assert.*;
 
 public class BufferPoolTest
 {
-    @BeforeClass
-    public static void setupDD()
-    {
-        DatabaseDescriptor.daemonInitialization();
-    }
+    private BufferPool bufferPool;
 
     @Before
     public void setUp()
     {
-        BufferPool.setMemoryUsageThreshold(8 * 1024L * 1024L);
-    }
-
-    @After
-    public void cleanUp()
-    {
-        resetBufferPool();
-    }
-
-    /**
-     * Exposes a utility method on this test that other tests might use to access the protected
-     * {@link BufferPool#unsafeReset()} method.
-     */
-    public static void resetBufferPool()
-    {
-        BufferPool.unsafeReset();
+        bufferPool = new BufferPool("test_pool", 8 * 1024 * 1024, true);
     }
 
     @Test
@@ -69,18 +48,20 @@ public class BufferPoolTest
     {
         final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
 
-        ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
         assertNotNull(buffer);
         assertEquals(size, buffer.capacity());
         assertEquals(true, buffer.isDirect());
+        assertEquals(size, bufferPool.usedSizeInBytes());
 
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
-        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
 
-        BufferPool.put(buffer);
-        assertEquals(null, BufferPool.unsafeCurrentChunk());
-        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+        bufferPool.put(buffer);
+        assertEquals(null, bufferPool.unsafeCurrentChunk());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
+        assertEquals(0, bufferPool.usedSizeInBytes());
     }
 
 
@@ -98,7 +79,7 @@ public class BufferPoolTest
 
     private void checkPageAligned(int size)
     {
-        ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
         assertNotNull(buffer);
         assertEquals(size, buffer.capacity());
         assertTrue(buffer.isDirect());
@@ -106,7 +87,7 @@ public class BufferPoolTest
         long address = MemoryUtil.getAddress(buffer);
         assertTrue((address % MemoryUtil.pageSize()) == 0);
 
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test
@@ -115,23 +96,23 @@ public class BufferPoolTest
         final int size1 = 1024;
         final int size2 = 2048;
 
-        ByteBuffer buffer1 = BufferPool.get(size1, BufferType.OFF_HEAP);
+        ByteBuffer buffer1 = bufferPool.get(size1, BufferType.OFF_HEAP);
         assertNotNull(buffer1);
         assertEquals(size1, buffer1.capacity());
 
-        ByteBuffer buffer2 = BufferPool.get(size2, BufferType.OFF_HEAP);
+        ByteBuffer buffer2 = bufferPool.get(size2, BufferType.OFF_HEAP);
         assertNotNull(buffer2);
         assertEquals(size2, buffer2.capacity());
 
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
-        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
 
-        BufferPool.put(buffer1);
-        BufferPool.put(buffer2);
+        bufferPool.put(buffer1);
+        bufferPool.put(buffer2);
 
-        assertEquals(null, BufferPool.unsafeCurrentChunk());
-        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+        assertEquals(null, bufferPool.unsafeCurrentChunk());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
     }
 
     @Test
@@ -149,14 +130,13 @@ public class BufferPoolTest
     @Test
     public void testMaxMemoryExceeded_SameAsChunkSize()
     {
-        BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
         requestDoubleMaxMemory();
     }
 
     @Test
     public void testMaxMemoryExceeded_SmallerThanChunkSize()
     {
-        BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2);
+        bufferPool = new BufferPool("test_pool", BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2, false);
         requestDoubleMaxMemory();
     }
 
@@ -168,7 +148,7 @@ public class BufferPoolTest
 
     private void requestDoubleMaxMemory()
     {
-        requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * BufferPool.getMemoryUsageThreshold()));
+        requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * bufferPool.memoryUsageThreshold()));
     }
 
     private void requestUpToSize(int bufferSize, int totalSize)
@@ -178,7 +158,7 @@ public class BufferPoolTest
         List<ByteBuffer> buffers = new ArrayList<>(numBuffers);
         for (int i = 0; i < numBuffers; i++)
         {
-            ByteBuffer buffer = BufferPool.get(bufferSize, BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(bufferSize, BufferType.OFF_HEAP);
             assertNotNull(buffer);
             assertEquals(bufferSize, buffer.capacity());
             assertTrue(buffer.isDirect());
@@ -186,7 +166,7 @@ public class BufferPoolTest
         }
 
         for (ByteBuffer buffer : buffers)
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
     }
 
     @Test
@@ -194,10 +174,10 @@ public class BufferPoolTest
     {
         final int size = BufferPool.NORMAL_CHUNK_SIZE + 1;
 
-        ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
         assertNotNull(buffer);
         assertEquals(size, buffer.capacity());
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test
@@ -209,25 +189,25 @@ public class BufferPoolTest
         List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers);
         List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers);
         for (int i = 0; i < numBuffers; i++)
-            buffers1.add(BufferPool.get(size, BufferType.OFF_HEAP));
+            buffers1.add(bufferPool.get(size, BufferType.OFF_HEAP));
 
-        BufferPool.Chunk chunk1 = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk1 = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk1);
 
         for (int i = 0; i < numBuffers; i++)
-            buffers2.add(BufferPool.get(size, BufferType.OFF_HEAP));
+            buffers2.add(bufferPool.get(size, BufferType.OFF_HEAP));
 
-        assertEquals(2, BufferPool.unsafeNumChunks());
+        assertEquals(2, bufferPool.unsafeNumChunks());
 
         for (ByteBuffer buffer : buffers1)
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
 
-        assertEquals(1, BufferPool.unsafeNumChunks());
+        assertEquals(1, bufferPool.unsafeNumChunks());
 
         for (ByteBuffer buffer : buffers2)
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
 
-        assertEquals(0, BufferPool.unsafeNumChunks());
+        assertEquals(0, bufferPool.unsafeNumChunks());
 
         buffers2.clear();
     }
@@ -263,16 +243,16 @@ public class BufferPoolTest
     {
         doTestRandomFrees(12345567878L);
 
-        BufferPool.unsafeReset();
+        bufferPool.unsafeReset();
         doTestRandomFrees(20452249587L);
 
-        BufferPool.unsafeReset();
+        bufferPool.unsafeReset();
         doTestRandomFrees(82457252948L);
 
-        BufferPool.unsafeReset();
+        bufferPool.unsafeReset();
         doTestRandomFrees(98759284579L);
 
-        BufferPool.unsafeReset();
+        bufferPool.unsafeReset();
         doTestRandomFrees(19475257244L);
     }
 
@@ -303,10 +283,10 @@ public class BufferPoolTest
         List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots);
         for (int i = 0; i < maxFreeSlots; i++)
         {
-            buffers.add(BufferPool.get(size, BufferType.OFF_HEAP));
+            buffers.add(bufferPool.get(size, BufferType.OFF_HEAP));
         }
 
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertFalse(chunk.isFree());
 
         int freeSize = BufferPool.NORMAL_CHUNK_SIZE - maxFreeSlots * size;
@@ -318,7 +298,7 @@ public class BufferPoolTest
             assertNotNull(buffer);
             assertEquals(size, buffer.capacity());
 
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
 
             freeSize += size;
             if (freeSize == chunk.capacity())
@@ -341,18 +321,18 @@ public class BufferPoolTest
         List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
         for (int i = 0; i < sizes.length; i++)
         {
-            ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
             assertNotNull(buffer);
             assertTrue(buffer.capacity() >= sizes[i]);
             buffers.add(buffer);
 
-            sum += BufferPool.unsafeCurrentChunk().roundUp(buffer.capacity());
+            sum += bufferPool.unsafeCurrentChunk().roundUp(buffer.capacity());
         }
 
         // else the test will fail, adjust sizes as required
         assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
 
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
 
         Random rnd = new Random();
@@ -362,11 +342,11 @@ public class BufferPoolTest
             int index = rnd.nextInt(buffers.size());
             ByteBuffer buffer = buffers.remove(index);
 
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
         }
-        BufferPool.put(buffers.remove(0));
+        bufferPool.put(buffers.remove(0));
 
-        assertEquals(null, BufferPool.unsafeCurrentChunk());
+        assertEquals(null, bufferPool.unsafeCurrentChunk());
         assertEquals(0, chunk.free());
     }
 
@@ -381,7 +361,7 @@ public class BufferPoolTest
         List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
         for (int i = 0; i < sizes.length; i++)
         {
-            ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
             assertNotNull(buffer);
             assertTrue(buffer.capacity() >= sizes[i]);
             buffers.add(buffer);
@@ -392,15 +372,15 @@ public class BufferPoolTest
         // else the test will fail, adjust sizes as required
         assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
 
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
 
         for (int i = 0; i < sizes.length; i++)
         {
-            BufferPool.put(buffers.get(i));
+            bufferPool.put(buffers.get(i));
         }
 
-        assertEquals(null, BufferPool.unsafeCurrentChunk());
+        assertEquals(null, bufferPool.unsafeCurrentChunk());
         assertEquals(0, chunk.free());
     }
 
@@ -415,22 +395,22 @@ public class BufferPoolTest
 
         for (int i = 0; i < numBuffersInChunk; i++)
         {
-            ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
             buffers.add(buffer);
             addresses.add(MemoryUtil.getAddress(buffer));
         }
 
         for (int i = numBuffersInChunk - 1; i >= 0; i--)
-            BufferPool.put(buffers.get(i));
+            bufferPool.put(buffers.get(i));
 
         buffers.clear();
 
         for (int i = 0; i < numBuffersInChunk; i++)
         {
-            ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
             assertNotNull(buffer);
             assertEquals(size, buffer.capacity());
-            addresses.remove(MemoryUtil.getAddress(buffer));
+            assert addresses.remove(MemoryUtil.getAddress(buffer));
 
             buffers.add(buffer);
         }
@@ -438,18 +418,18 @@ public class BufferPoolTest
         assertTrue(addresses.isEmpty()); // all 5 released buffers were used
 
         for (ByteBuffer buffer : buffers)
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
     }
 
     @Test
     public void testHeapBuffer()
     {
-        ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP);
+        ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP);
         assertNotNull(buffer);
         assertEquals(1024, buffer.capacity());
         assertFalse(buffer.isDirect());
         assertNotNull(buffer.array());
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test
@@ -497,17 +477,17 @@ public class BufferPoolTest
 
     private void checkBuffer(int size)
     {
-        ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
         assertEquals(size, buffer.capacity());
 
         if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE)
         {
-            BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+            BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
             assertNotNull(chunk);
             assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size));
         }
 
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test
@@ -525,14 +505,14 @@ public class BufferPoolTest
 
         for (int size : sizes)
         {
-            ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+            ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
             assertEquals(size, buffer.capacity());
 
             buffers.add(buffer);
         }
 
         for (ByteBuffer buffer : buffers)
-            BufferPool.put(buffer);
+            bufferPool.put(buffer);
     }
 
     @Test
@@ -544,36 +524,36 @@ public class BufferPoolTest
     private void checkBufferWithGivenSlots(int size, long freeSlots)
     {
         //first allocate to make sure there is a chunk
-        ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
 
         // now get the current chunk and override the free slots mask
-        BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
         long oldFreeSlots = chunk.setFreeSlots(freeSlots);
 
         // now check we can still get the buffer with the free slots mask changed
-        ByteBuffer buffer2 = BufferPool.get(size, BufferType.OFF_HEAP);
+        ByteBuffer buffer2 = bufferPool.get(size, BufferType.OFF_HEAP);
         assertEquals(size, buffer.capacity());
-        BufferPool.put(buffer2);
+        bufferPool.put(buffer2);
 
         // unsafeReset the free slots
         chunk.setFreeSlots(oldFreeSlots);
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test
     public void testZeroSizeRequest()
     {
-        ByteBuffer buffer = BufferPool.get(0, BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(0, BufferType.OFF_HEAP);
         assertNotNull(buffer);
         assertEquals(0, buffer.capacity());
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNegativeSizeRequest()
     {
-        BufferPool.get(-1, BufferType.OFF_HEAP);
+        bufferPool.get(-1, BufferType.OFF_HEAP);
     }
 
     @Test
@@ -689,7 +669,7 @@ public class BufferPoolTest
 
                         for (int j = 0; j < threadSizes.length; j++)
                         {
-                            ByteBuffer buffer = BufferPool.get(threadSizes[j], BufferType.OFF_HEAP);
+                            ByteBuffer buffer = bufferPool.get(threadSizes[j], BufferType.OFF_HEAP);
                             assertNotNull(buffer);
                             assertEquals(threadSizes[j], buffer.capacity());
 
@@ -704,17 +684,17 @@ public class BufferPoolTest
                                 assertEquals(i, buffer.getInt());
 
                             if (returnImmediately)
-                                BufferPool.put(buffer);
+                                bufferPool.put(buffer);
                             else
                                 toBeReturned.add(buffer);
 
-                            assertTrue(BufferPool.sizeInBytes() > 0);
+                            assertTrue(bufferPool.sizeInBytes() > 0);
                         }
 
                         Thread.sleep(rand.nextInt(3));
 
                         for (ByteBuffer buffer : toBeReturned)
-                            BufferPool.put(buffer);
+                            bufferPool.put(buffer);
                     }
                     catch (Exception ex)
                     {
@@ -758,13 +738,13 @@ public class BufferPoolTest
         int sum = 0;
         for (int i = 0; i < sizes.length; i++)
         {
-            buffers[i] = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+            buffers[i] = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
             assertNotNull(buffers[i]);
             assertEquals(sizes[i], buffers[i].capacity());
-            sum += BufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity());
+            sum += bufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity());
         }
 
-        final BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+        final BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
         assertNotNull(chunk);
         assertFalse(chunk.isFree());
 
@@ -786,8 +766,8 @@ public class BufferPoolTest
                 {
                     try
                     {
-                        assertNotSame(chunk, BufferPool.unsafeCurrentChunk());
-                        BufferPool.put(buffer);
+                        assertNotSame(chunk, bufferPool.unsafeCurrentChunk());
+                        bufferPool.put(buffer);
                     }
                     catch (AssertionError ex)
                     { //this is expected if we release a buffer more than once
@@ -816,12 +796,185 @@ public class BufferPoolTest
         System.gc();
         System.gc();
 
-        assertTrue(BufferPool.unsafeCurrentChunk().isFree());
+        assertTrue(bufferPool.unsafeCurrentChunk().isFree());
 
         //make sure the main thread can still allocate buffers
-        ByteBuffer buffer = BufferPool.get(sizes[0], BufferType.OFF_HEAP);
+        ByteBuffer buffer = bufferPool.get(sizes[0], BufferType.OFF_HEAP);
         assertNotNull(buffer);
         assertEquals(sizes[0], buffer.capacity());
-        BufferPool.put(buffer);
+        bufferPool.put(buffer);
+    }
+
+    @Test
+    public void testOverflowAllocation()
+    {
+        int macroChunkSize = BufferPool.GlobalPool.MACRO_CHUNK_SIZE;
+        int allocationSize = BufferPool.NORMAL_CHUNK_SIZE;
+        int allocations = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / allocationSize;
+
+        // occupy entire buffer pool
+        List<ByteBuffer> buffers = new ArrayList<>();
+        allocate(allocations, allocationSize, buffers);
+
+        assertEquals(macroChunkSize, bufferPool.sizeInBytes());
+        assertEquals(macroChunkSize, bufferPool.usedSizeInBytes());
+        assertEquals(0, bufferPool.overflowMemoryInBytes());
+
+        // allocate overflow due to pool exhaust
+        ByteBuffer overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.OFF_HEAP);
+
+        assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+        assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+        assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+
+        // free all buffer
+        bufferPool.put(overflowBuffer);
+        release(buffers);
+
+        assertEquals(macroChunkSize, bufferPool.sizeInBytes());
+        assertEquals(0, bufferPool.usedSizeInBytes());
+        assertEquals(0, bufferPool.overflowMemoryInBytes());
+
+        // allocate overflow due to on-heap
+        overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.ON_HEAP);
+        assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+        assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+        assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+        bufferPool.put(overflowBuffer);
+
+        // allocate overflow due to over allocation size
+        overflowBuffer = bufferPool.get(2 * BufferPool.NORMAL_CHUNK_SIZE, BufferType.ON_HEAP);
+        assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+        assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+        assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+    }
+
+    @Test
+    public void testRecyclePartialFreeChunk()
+    {
+        // normal chunk size is 128kb
+        int halfNormalChunk = BufferPool.NORMAL_CHUNK_SIZE / 2; // 64kb, half of normal chunk
+        List<ByteBuffer> toRelease = new ArrayList<>();
+
+        // allocate three buffers on different chunks
+        ByteBuffer buffer0 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+        BufferPool.Chunk chunk0 = BufferPool.Chunk.getParentChunk(buffer0);
+        assertFalse(chunk0.isFree());
+        allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+        ByteBuffer buffer1 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+        BufferPool.Chunk chunk1 = BufferPool.Chunk.getParentChunk(buffer1);
+        assertFalse(chunk1.isFree());
+        assertNotEquals(chunk0, chunk1);
+        allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+        ByteBuffer buffer2 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+        BufferPool.Chunk chunk2 = BufferPool.Chunk.getParentChunk(buffer2);
+        assertFalse(chunk2.isFree());
+        assertNotEquals(chunk0, chunk2);
+        assertNotEquals(chunk1, chunk2);
+        allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+        // now all 3 chunks in local pool is full, allocate one more buffer to evict chunk2
+        ByteBuffer buffer3 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+        BufferPool.Chunk chunk3 = BufferPool.Chunk.getParentChunk(buffer3);
+        assertNotEquals(chunk0, chunk3);
+        assertNotEquals(chunk1, chunk3);
+        assertNotEquals(chunk2, chunk3);
+
+        // verify chunk2 got evicted, it doesn't have a owner
+        assertNotNull(chunk0.owner());
+        assertEquals(BufferPool.Chunk.Status.IN_USE, chunk0.status());
+        assertNotNull(chunk1.owner());
+        assertEquals(BufferPool.Chunk.Status.IN_USE, chunk1.status());
+        assertNull(chunk2.owner());
+        assertEquals(BufferPool.Chunk.Status.EVICTED, chunk2.status());
+
+        // release half buffers for chunk0/1/2
+        release(toRelease);
+        BufferPool.Chunk partiallyFreed = chunk2;
+
+        // try to recirculate chunk2 and verify freed space
+        assertFalse(bufferPool.globalPool().isFullyFreed(partiallyFreed));
+        assertTrue(bufferPool.globalPool().isPartiallyFreed(partiallyFreed));
+        assertEquals(BufferPool.Chunk.Status.IN_USE, partiallyFreed.status());
+        assertEquals(halfNormalChunk, partiallyFreed.free());
+        ByteBuffer buffer = partiallyFreed.get(halfNormalChunk, false, null);
+        assertEquals(halfNormalChunk, buffer.capacity());
+
+        // cleanup allocated buffers
+        for (ByteBuffer buf : Arrays.asList(buffer0, buffer1, buffer2, buffer3, buffer))
+            bufferPool.put(buf);
+
+        // verify that fully freed chunk are prioritized over partially freed chunks
+        List<BufferPool.Chunk> remainingChunks = new ArrayList<>();
+        BufferPool.Chunk chunkForAllocation;
+        while ((chunkForAllocation = bufferPool.globalPool().get()) != null)
+            remainingChunks.add(chunkForAllocation);
+
+        int totalNormalChunks = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / BufferPool.NORMAL_CHUNK_SIZE; // 64;
+        assertEquals(totalNormalChunks, remainingChunks.size());
+        assertSame(partiallyFreed, remainingChunks.get(remainingChunks.size() - 1)); // last one is partially freed
+
+        // cleanup polled chunks
+        remainingChunks.forEach(BufferPool.Chunk::release);
+    }
+
+    @Test
+    public void testTinyPool()
+    {
+        int total = 0;
+        final int size = BufferPool.TINY_ALLOCATION_UNIT;
+        final int allocationPerChunk = 64;
+
+        // occupy 3 tiny chunks
+        List<ByteBuffer> buffers0 = new ArrayList<>();
+        BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0);
+        assertTrue(chunk0.owner().isTinyPool());
+        List<ByteBuffer> buffers1 = new ArrayList<>();
+        BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1);
+        assertTrue(chunk1.owner().isTinyPool());
+        List<ByteBuffer> buffers2 = new ArrayList<>();
+        BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2);
+        assertTrue(chunk2.owner().isTinyPool());
+        total += 3 * BufferPool.TINY_CHUNK_SIZE;
+        assertEquals(total, bufferPool.usedSizeInBytes());
+
+        // allocate another tiny chunk.. chunk2 should be evicted
+        List<ByteBuffer> buffers3 = new ArrayList<>();
+        BufferPool.Chunk chunk3 = allocate(allocationPerChunk, size, buffers3);
+        assertTrue(chunk3.owner().isTinyPool());
+        total += BufferPool.TINY_CHUNK_SIZE;
+        assertEquals(total, bufferPool.usedSizeInBytes());
+
+        // verify chunk2 is full and evicted
+        assertEquals(0, chunk2.free());
+        assertNull(chunk2.owner());
+
+        // release chunk2's buffer
+        for (int i = 0; i < buffers2.size(); i++)
+        {
+            bufferPool.put(buffers2.get(i));
+            total -= buffers2.get(i).capacity();
+            assertEquals(total, bufferPool.usedSizeInBytes());
+        }
+
+        // cleanup allocated buffers
+        for (ByteBuffer buffer : Iterables.concat(buffers0, buffers1, buffers3))
+            bufferPool.put(buffer);
+    }
+
+    private BufferPool.Chunk allocate(int num, int bufferSize, List<ByteBuffer> buffers)
+    {
+        for (int i = 0; i < num; i++)
+            buffers.add(bufferPool.get(bufferSize, BufferType.OFF_HEAP));
+
+        return BufferPool.Chunk.getParentChunk(buffers.get(buffers.size() - 1));
+    }
+
+    private void release(List<ByteBuffer> toRelease)
+    {
+        for (ByteBuffer buffer : toRelease)
+            bufferPool.put(buffer);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org