You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2020/10/15 15:13:23 UTC
[cassandra] branch trunk updated: CASSANDRA-15229: Segregate
Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks
This is an automated email from the ASF dual-hosted git repository.
jasonstack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 699a1f7 CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks
699a1f7 is described below
commit 699a1f74fcc1da1952da6b2b0309c9e2474c67f4
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Thu Oct 15 22:53:44 2020 +0800
CASSANDRA-15229: Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks
* initiate multiple buffer pool for different lifespan and usages
- Chunk Cache Buffer Pool - conf.file_cache_size_in_mb=512mb
- Networking Buffer Pool - conf.temporary_cache_size_in_mb=128mb
* Add overflowSize and usedSize to buffer pool metrics
* re-circulate buffer pool Chunk for ChunkCache whenever it has free space, even thoughput it may not be able to allocate due to fragmentation
patch by Zhao Yang; reviewed by Caleb Rackliffe and Aleksey Yeschenko for CASSANDRA-15229
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 13 +-
.../org/apache/cassandra/cache/ChunkCache.java | 14 +-
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 14 +
.../db/streaming/CassandraStreamWriter.java | 6 +-
.../cassandra/hints/ChecksummedDataInput.java | 6 +-
.../hints/CompressedChecksummedDataInput.java | 13 +-
.../io/util/BufferManagingRebufferer.java | 6 +-
.../cassandra/metrics/BufferPoolMetrics.java | 45 +-
.../cassandra/net/AsyncStreamingOutputPlus.java | 13 +-
.../apache/cassandra/net/BufferPoolAllocator.java | 13 +-
.../cassandra/net/FrameDecoderLegacyLZ4.java | 11 +-
.../org/apache/cassandra/net/FrameEncoder.java | 9 +-
.../org/apache/cassandra/net/FrameEncoderCrc.java | 2 +-
.../org/apache/cassandra/net/FrameEncoderLZ4.java | 9 +-
.../cassandra/net/FrameEncoderLegacyLZ4.java | 8 +-
.../cassandra/net/FrameEncoderUnprotected.java | 2 +-
.../apache/cassandra/net/HandshakeProtocol.java | 6 +-
.../cassandra/net/InboundConnectionInitiator.java | 6 +-
.../cassandra/net/LocalBufferPoolAllocator.java | 3 +-
.../cassandra/net/OutboundConnectionInitiator.java | 4 +-
.../org/apache/cassandra/net/ShareableBytes.java | 6 +-
.../apache/cassandra/utils/memory/BufferPool.java | 466 ++++++++++++++++-----
.../apache/cassandra/utils/memory/BufferPools.java | 79 ++++
.../apache/cassandra/net/ConnectionBurnTest.java | 4 +-
.../cassandra/utils/memory/LongBufferPoolTest.java | 111 ++---
test/data/jmxdump/cassandra-4.0-jmx.yaml | 75 +++-
.../cassandra/distributed/impl/Instance.java | 4 +-
.../cassandra/metrics/BufferPoolMetricsTest.java | 125 ++++--
.../unit/org/apache/cassandra/net/FramingTest.java | 6 +-
.../cassandra/utils/memory/BufferPoolTest.java | 361 +++++++++++-----
32 files changed, 1067 insertions(+), 376 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fe3fef8..543a1cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta3
+ * Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229)
* Fail truncation requests when they fail on a replica (CASSANDRA-16208)
* Move compact storage validation earlier in startup process (CASSANDRA-16063)
* Fix ByteBufferAccessor cast exceptions are thrown when trying to query a virtual table (CASSANDRA-16155)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ff414ed..37b18f9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -469,13 +469,22 @@ concurrent_counter_writes: 32
# be limited by the less of concurrent reads or concurrent writes.
concurrent_materialized_view_writes: 32
+# Maximum memory to use for inter-node and client-server networking buffers.
+#
+# Defaults to the smaller of 1/16 of heap or 128MB. This pool is allocated off-heap,
+# so is in addition to the memory allocated for heap. The cache also has on-heap
+# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
+# if the default 64k chunk size is used).
+# Memory is only allocated when needed.
+# networking_cache_size_in_mb: 128
+
# Enable the sstable chunk cache. The chunk cache will store recently accessed
# sections of the sstable in-memory as uncompressed buffers.
# file_cache_enabled: false
# Maximum memory to use for sstable chunk cache and buffer pooling.
-# 32MB of this are reserved for pooling buffers, the rest is used as an
-# cache that holds uncompressed sstable chunks.
+# 32MB of this are reserved for pooling buffers, the rest is used for chunk cache
+# that holds uncompressed sstable chunks.
# Defaults to the smaller of 1/4 of heap or 512MB. This pool is allocated off-heap,
# so is in addition to the memory allocated for heap. The cache also has on-heap
# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index ae38015..c53810a 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.ChunkCacheMetrics;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
public class ChunkCache
implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize
@@ -43,7 +44,9 @@ public class ChunkCache
public static final boolean roundUp = DatabaseDescriptor.getFileCacheRoundUp();
private static boolean enabled = DatabaseDescriptor.getFileCacheEnabled() && cacheSize > 0;
- public static final ChunkCache instance = enabled ? new ChunkCache() : null;
+ public static final ChunkCache instance = enabled ? new ChunkCache(BufferPools.forChunkCache()) : null;
+
+ private final BufferPool bufferPool;
private final LoadingCache<Key, Buffer> cache;
public final ChunkCacheMetrics metrics;
@@ -86,7 +89,7 @@ public class ChunkCache
}
}
- static class Buffer implements Rebufferer.BufferHolder
+ class Buffer implements Rebufferer.BufferHolder
{
private final ByteBuffer buffer;
private final long offset;
@@ -130,12 +133,13 @@ public class ChunkCache
public void release()
{
if (references.decrementAndGet() == 0)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
}
- private ChunkCache()
+ private ChunkCache(BufferPool pool)
{
+ bufferPool = pool;
metrics = new ChunkCacheMetrics(this);
cache = Caffeine.newBuilder()
.maximumWeight(cacheSize)
@@ -149,7 +153,7 @@ public class ChunkCache
@Override
public Buffer load(Key key)
{
- ByteBuffer buffer = BufferPool.get(key.file.chunkSize(), key.file.preferredBufferType());
+ ByteBuffer buffer = bufferPool.get(key.file.chunkSize(), key.file.preferredBufferType());
assert buffer != null;
key.file.readChunk(key.position, buffer);
return new Buffer(buffer, key.position);
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index da410155..26854da 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -302,6 +302,8 @@ public class Config
private static boolean isClientMode = false;
private static Supplier<Config> overrideLoadConfig = null;
+ public Integer networking_cache_size_in_mb;
+
public Integer file_cache_size_in_mb;
public boolean file_cache_enabled = Boolean.getBoolean("cassandra.file_cache_enabled");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e8e66fa..0387105 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -453,6 +453,9 @@ public class DatabaseDescriptor
if (conf.concurrent_replicates != null)
logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml");
+ if (conf.networking_cache_size_in_mb == null)
+ conf.networking_cache_size_in_mb = Math.min(128, (int) (Runtime.getRuntime().maxMemory() / (16 * 1048576)));
+
if (conf.file_cache_size_in_mb == null)
conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
@@ -2449,6 +2452,17 @@ public class DatabaseDescriptor
return conf.file_cache_size_in_mb;
}
+ public static int getNetworkingCacheSizeInMB()
+ {
+ if (conf.networking_cache_size_in_mb == null)
+ {
+ // In client mode the value is not set.
+ assert DatabaseDescriptor.isClientInitialized();
+ return 0;
+ }
+ return conf.networking_cache_size_in_mb;
+ }
+
public static boolean getFileCacheRoundUp()
{
if (conf.file_cache_round_up == null)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index 10296fb..6481f4b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static org.apache.cassandra.net.MessagingService.current_version;
@@ -152,7 +152,7 @@ public class CassandraStreamWriter
// this buffer will hold the data from disk. as it will be compressed on the fly by
// AsyncChannelCompressedStreamWriter.write(ByteBuffer), we can release this buffer as soon as we can.
- ByteBuffer buffer = BufferPool.get(minReadable, BufferType.OFF_HEAP);
+ ByteBuffer buffer = BufferPools.forNetworking().get(minReadable, BufferType.OFF_HEAP);
try
{
int readCount = proxy.read(buffer, start);
@@ -171,7 +171,7 @@ public class CassandraStreamWriter
}
finally
{
- BufferPool.put(buffer);
+ BufferPools.forNetworking().put(buffer);
}
return toTransfer;
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 30d18fa..e6e8b38 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,9 +26,9 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.NativeLibrary;
-import org.apache.cassandra.utils.memory.BufferPool;
/**
* A {@link RandomAccessReader} wrapper that calculates the CRC in place.
@@ -55,7 +55,7 @@ public class ChecksummedDataInput extends RebufferingInputStream
ChecksummedDataInput(ChannelProxy channel, BufferType bufferType)
{
- super(BufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, bufferType));
+ super(bufferType.allocate(RandomAccessReader.DEFAULT_BUFFER_SIZE));
crc = new CRC32();
crcPosition = 0;
@@ -244,7 +244,7 @@ public class ChecksummedDataInput extends RebufferingInputStream
@Override
public void close()
{
- BufferPool.put(buffer);
+ FileUtils.clean(buffer);
channel.close();
}
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index 0381b00..2f442be 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -28,9 +28,12 @@ import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.memory.BufferPools;
public final class CompressedChecksummedDataInput extends ChecksummedDataInput
{
+ private static final BufferPool bufferPool = BufferPools.forChunkCache();
+
private final ICompressor compressor;
private volatile long filePosition = 0; // Current position in file, advanced when reading chunk.
private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk.
@@ -117,9 +120,9 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size
if (compressedBuffer != null)
{
- BufferPool.put(compressedBuffer);
+ bufferPool.put(compressedBuffer);
}
- compressedBuffer = BufferPool.get(bufferSize, compressor.preferredBufferType());
+ compressedBuffer = bufferPool.get(bufferSize, compressor.preferredBufferType());
}
compressedBuffer.clear();
@@ -131,8 +134,8 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
if (buffer.capacity() < uncompressedSize)
{
int bufferSize = uncompressedSize + (uncompressedSize / 20);
- BufferPool.put(buffer);
- buffer = BufferPool.get(bufferSize, compressor.preferredBufferType());
+ bufferPool.put(buffer);
+ buffer = bufferPool.get(bufferSize, compressor.preferredBufferType());
}
buffer.clear();
@@ -151,7 +154,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
@Override
public void close()
{
- BufferPool.put(compressedBuffer);
+ bufferPool.put(compressedBuffer);
super.close();
}
diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index f3b9a88..3a297ee 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -23,7 +23,7 @@ package org.apache.cassandra.io.util;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
/**
* Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are
@@ -42,14 +42,14 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
protected BufferManagingRebufferer(ChunkReader wrapped)
{
this.source = wrapped;
- buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
+ buffer = BufferPools.forChunkCache().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
buffer.limit(0);
}
@Override
public void closeReader()
{
- BufferPool.put(buffer);
+ BufferPools.forChunkCache().put(buffer);
offset = -1;
}
diff --git a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
index c9c859a..78e7265 100644
--- a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,24 +25,47 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
public class BufferPoolMetrics
{
- private static final MetricNameFactory factory = new DefaultNameFactory("BufferPool");
+ /** Total number of hits */
+ public final Meter hits;
/** Total number of misses */
public final Meter misses;
- /** Total size of buffer pools, in bytes */
+ /** Total size of buffer pools, in bytes, including overflow allocation */
public final Gauge<Long> size;
- public BufferPoolMetrics()
+ /** Total size, in bytes, of active buffered being used from the pool currently + overflow */
+ public final Gauge<Long> usedSize;
+
+ /**
+ * Total size, in bytes, of direct or heap buffers allocated by the pool but not part of the pool
+ * either because they are too large to fit or because the pool has exceeded its maximum limit or because it's
+ * on-heap allocation.
+ */
+ public final Gauge<Long> overflowSize;
+
+ public BufferPoolMetrics(String scope, BufferPool bufferPool)
{
+ MetricNameFactory factory = new DefaultNameFactory("BufferPool", scope);
+
+ hits = Metrics.meter(factory.createMetricName("Hits"));
+
misses = Metrics.meter(factory.createMetricName("Misses"));
- size = Metrics.register(factory.createMetricName("Size"), new Gauge<Long>()
- {
- public Long getValue()
- {
- return BufferPool.sizeInBytes();
- }
- });
+ overflowSize = Metrics.register(factory.createMetricName("OverflowSize"), bufferPool::overflowMemoryInBytes);
+
+ usedSize = Metrics.register(factory.createMetricName("UsedSize"), bufferPool::usedSizeInBytes);
+
+ size = Metrics.register(factory.createMetricName("Size"), bufferPool::sizeInBytes);
+ }
+
+ /**
+ * used to register alias for 3.0/3.11 compatibility
+ */
+ public void register3xAlias()
+ {
+ MetricNameFactory legacyFactory = new DefaultNameFactory("BufferPool");
+ Metrics.registerMBean(misses, legacyFactory.createMetricName("Misses").getMBeanName());
+ Metrics.registerMBean(size, legacyFactory.createMetricName("Size").getMBeanName());
}
}
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index 680a9d3..3a9c075 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel;
import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.lang.Math.min;
@@ -53,6 +54,8 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
{
private static final Logger logger = LoggerFactory.getLogger(AsyncStreamingOutputPlus.class);
+ private final BufferPool bufferPool = BufferPools.forNetworking();
+
final int defaultLowWaterMark;
final int defaultHighWaterMark;
@@ -68,7 +71,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
private void allocateBuffer()
{
// this buffer is only used for small quantities of data
- buffer = BufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP);
+ buffer = bufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP);
}
@Override
@@ -140,7 +143,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
throw new IllegalStateException("Can only allocate one ByteBuffer");
limiter.acquire(size);
holder.promise = beginFlush(size, defaultLowWaterMark, defaultHighWaterMark);
- holder.buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ holder.buffer = bufferPool.get(size, BufferType.OFF_HEAP);
return holder.buffer;
});
}
@@ -148,14 +151,14 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
{
// we don't currently support cancelling the flush, but at this point we are recoverable if we want
if (holder.buffer != null)
- BufferPool.put(holder.buffer);
+ bufferPool.put(holder.buffer);
if (holder.promise != null)
holder.promise.tryFailure(t);
throw t;
}
ByteBuffer buffer = holder.buffer;
- BufferPool.putUnusedPortion(buffer);
+ bufferPool.putUnusedPortion(buffer);
int length = buffer.limit();
channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(buffer), holder.promise);
@@ -260,7 +263,7 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus
{
if (buffer != null)
{
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
buffer = null;
}
}
diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
index 224f690..11c0641 100644
--- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
@@ -25,6 +25,7 @@ import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
/**
* A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour
@@ -32,6 +33,8 @@ import org.apache.cassandra.utils.memory.BufferPool;
*/
public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
{
+ private static final BufferPool bufferPool = BufferPools.forNetworking();
+
BufferPoolAllocator()
{
super(true);
@@ -60,22 +63,22 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
ByteBuffer get(int size)
{
- return BufferPool.get(size, BufferType.OFF_HEAP);
+ return bufferPool.get(size, BufferType.OFF_HEAP);
}
ByteBuffer getAtLeast(int size)
{
- return BufferPool.getAtLeast(size, BufferType.OFF_HEAP);
+ return bufferPool.getAtLeast(size, BufferType.OFF_HEAP);
}
void put(ByteBuffer buffer)
{
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
void putUnusedPortion(ByteBuffer buffer)
{
- BufferPool.putUnusedPortion(buffer);
+ bufferPool.putUnusedPortion(buffer);
}
void release()
@@ -100,7 +103,7 @@ public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
public void deallocate()
{
if (wrapped != null)
- BufferPool.put(wrapped);
+ bufferPool.put(wrapped);
}
public ByteBuffer adopt()
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
index bf6bc17..4c620c7 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
@@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.lang.Integer.reverseBytes;
import static java.lang.String.format;
@@ -46,6 +47,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.copyBytes;
*/
class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
{
+ private static final BufferPool bufferPool = BufferPools.forNetworking();
+
FrameDecoderLegacyLZ4(BufferPoolAllocator allocator, int messagingVersion)
{
super(allocator, messagingVersion);
@@ -122,7 +125,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
assert msg instanceof BufferPoolAllocator.Wrapped;
ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt();
// netty will probably have mis-predicted the space needed
- BufferPool.putUnusedPortion(buf);
+ bufferPool.putUnusedPortion(buf);
CorruptLZ4Frame error = null;
try
@@ -252,7 +255,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
}
catch (Throwable t)
{
- BufferPool.put(out);
+ bufferPool.put(out);
throw t;
}
}
@@ -269,7 +272,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
{
if (null != stash)
{
- BufferPool.put(stash);
+ bufferPool.put(stash);
stash = null;
}
@@ -348,7 +351,7 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
ByteBuffer out = allocator.getAtLeast(capacity);
in.flip();
out.put(in);
- BufferPool.put(in);
+ bufferPool.put(in);
return out;
}
diff --git a/src/java/org/apache/cassandra/net/FrameEncoder.java b/src/java/org/apache/cassandra/net/FrameEncoder.java
index d9df166..5f2dc37 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoder.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoder.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
{
+ protected static final BufferPool bufferPool = BufferPools.forNetworking();
+
/**
* An abstraction useful for transparently allocating buffers that can be written to upstream
* of the {@code FrameEncoder} without knowledge of the encoder's frame layout, while ensuring
@@ -57,7 +60,7 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
this.headerLength = headerLength;
this.trailerLength = trailerLength;
- buffer = BufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP);
+ buffer = bufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP);
assert buffer.capacity() >= payloadCapacity + headerLength + trailerLength;
buffer.position(headerLength);
buffer.limit(buffer.capacity() - trailerLength);
@@ -103,12 +106,12 @@ abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
isFinished = true;
buffer.limit(buffer.position() + trailerLength);
buffer.position(0);
- BufferPool.putUnusedPortion(buffer);
+ bufferPool.putUnusedPortion(buffer);
}
void release()
{
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
}
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
index 2d07d6d..5049f29 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java
@@ -91,7 +91,7 @@ class FrameEncoderCrc extends FrameEncoder
}
catch (Throwable t)
{
- BufferPool.put(frame);
+ bufferPool.put(frame);
throw t;
}
}
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
index 12351ce..2d76170 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java
@@ -27,7 +27,6 @@ import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.BufferPool;
import static org.apache.cassandra.net.Crc.*;
@@ -74,7 +73,7 @@ class FrameEncoderLZ4 extends FrameEncoder
throw new IllegalArgumentException("Maximum uncompressed payload size is 128KiB");
int maxOutputLength = compressor.maxCompressedLength(uncompressedLength);
- frame = BufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP);
+ frame = bufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP);
int compressedLength = compressor.compress(in, in.position(), uncompressedLength, frame, HEADER_LENGTH, maxOutputLength);
@@ -101,18 +100,18 @@ class FrameEncoderLZ4 extends FrameEncoder
frame.putInt(frameCrc);
frame.position(0);
- BufferPool.putUnusedPortion(frame);
+ bufferPool.putUnusedPortion(frame);
return GlobalBufferPoolAllocator.wrap(frame);
}
catch (Throwable t)
{
if (frame != null)
- BufferPool.put(frame);
+ bufferPool.put(frame);
throw t;
}
finally
{
- BufferPool.put(in);
+ bufferPool.put(in);
}
}
}
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
index 3b29ecb..000fab7 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java
@@ -68,7 +68,7 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder
ByteBuffer frame = null;
try
{
- frame = BufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP);
+ frame = bufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP);
int frameOffset = 0;
int payloadOffset = 0;
@@ -82,19 +82,19 @@ class FrameEncoderLegacyLZ4 extends FrameEncoder
}
frame.limit(frameOffset);
- BufferPool.putUnusedPortion(frame);
+ bufferPool.putUnusedPortion(frame);
return GlobalBufferPoolAllocator.wrap(frame);
}
catch (Throwable t)
{
if (null != frame)
- BufferPool.put(frame);
+ bufferPool.put(frame);
throw t;
}
finally
{
- BufferPool.put(payload);
+ bufferPool.put(payload);
}
}
diff --git a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
index 3bca41c..6158713 100644
--- a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
+++ b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java
@@ -59,7 +59,7 @@ class FrameEncoderUnprotected extends FrameEncoder
}
catch (Throwable t)
{
- BufferPool.put(frame);
+ bufferPool.put(frame);
throw t;
}
}
diff --git a/src/java/org/apache/cassandra/net/HandshakeProtocol.java b/src/java/org/apache/cassandra/net/HandshakeProtocol.java
index 47d0ec6..bfdcc2c 100644
--- a/src/java/org/apache/cassandra/net/HandshakeProtocol.java
+++ b/src/java/org/apache/cassandra/net/HandshakeProtocol.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
@@ -135,7 +135,7 @@ class HandshakeProtocol
ByteBuf encode()
{
- ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP);
+ ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP);
try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer))
{
out.writeInt(Message.PROTOCOL_MAGIC);
@@ -347,7 +347,7 @@ class HandshakeProtocol
ByteBuf encode()
{
- ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP);
+ ByteBuffer buffer = BufferPools.forNetworking().get(MAX_LENGTH, BufferType.OFF_HEAP);
try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer))
{
out.writeInt(maxMessagingVersion);
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index e02512b..f2339eb 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -52,7 +52,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.OutboundConnectionSettings.Framing;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.streaming.async.StreamingInboundHandler;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.lang.Math.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -389,7 +389,7 @@ public class InboundConnectionInitiator
from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
}
- BufferPool.setRecycleWhenFreeForCurrentThread(false);
+ BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
pipeline.replace(this, "streamInbound", new StreamingInboundHandler(from, current_version, null));
logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}",
@@ -411,7 +411,7 @@ public class InboundConnectionInitiator
// record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over
instance().versions.set(from, maxMessagingVersion);
- BufferPool.setRecycleWhenFreeForCurrentThread(false);
+ BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
if (initiate.type == ConnectionType.LARGE_MESSAGES)
{
diff --git a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
index b2d487f..8017854 100644
--- a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import io.netty.channel.EventLoop;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
/**
* Equivalent to {@link GlobalBufferPoolAllocator}, except explicitly using a specified
@@ -36,7 +37,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator
LocalBufferPoolAllocator(EventLoop eventLoop)
{
- this.pool = new BufferPool.LocalPool().recycleWhenFree(false);
+ this.pool = BufferPools.forNetworking().create().recycleWhenFree(false);
this.eventLoop = eventLoop;
}
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index 5f3eced..4a5585a 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSucc
import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
@@ -338,7 +338,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
ChannelPipeline pipeline = ctx.pipeline();
if (result.isSuccess())
{
- BufferPool.setRecycleWhenFreeForCurrentThread(false);
+ BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
if (type.isMessaging())
{
assert frameEncoder != null;
diff --git a/src/java/org/apache/cassandra/net/ShareableBytes.java b/src/java/org/apache/cassandra/net/ShareableBytes.java
index e4f2460..71c272a 100644
--- a/src/java/org/apache/cassandra/net/ShareableBytes.java
+++ b/src/java/org/apache/cassandra/net/ShareableBytes.java
@@ -20,10 +20,10 @@ package org.apache.cassandra.net;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
/**
- * A wrapper for possibly sharing portions of a single, {@link BufferPool} managed, {@link ByteBuffer};
+ * A wrapper for possibly sharing portions of a single, {@link BufferPools#forNetworking()} managed, {@link ByteBuffer};
* optimised for the case where no sharing is necessary.
*
* When sharing is necessary, {@link #share()} method must be invoked by the owning thread
@@ -136,7 +136,7 @@ class ShareableBytes
throw new IllegalStateException("Already released");
if (count == RELEASED)
- BufferPool.put(bytes);
+ BufferPools.forNetworking().put(bytes);
}
boolean isReleased()
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index b18f689..531b492 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -29,6 +29,8 @@ import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -41,7 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.config.DatabaseDescriptor;
+
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.BufferPoolMetrics;
@@ -54,10 +56,52 @@ import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
/**
- * A pool of ByteBuffers that can be recycled.
+ * A pool of ByteBuffers that can be recycled to reduce system direct memory fragmentation and improve buffer allocation
+ * performance.
+ * <p/>
+ *
+ * Each {@link BufferPool} instance has one {@link GlobalPool} which allocates two kinds of chunks:
+ * <ul>
+ * <li>Macro Chunk
+ * <ul>
+ * <li>A memory slab that has size of MACRO_CHUNK_SIZE which is 64 * NORMAL_CHUNK_SIZE</li>
+ * <li>Used to allocate normal chunk with size of NORMAL_CHUNK_SIZE</li>
+ * </ul>
+ * </li>
+ * <li>Normal Chunk
+ * <ul>
+ * <li>Used by {@link LocalPool} to serve buffer allocation</li>
+ * <li>Minimum allocation unit is NORMAL_CHUNK_SIZE / 64</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * {@link GlobalPool} maintains two kinds of freed chunks, fully freed chunks where all buffers are released, and
+ * partially freed chunks where some buffers are not released, eg. held by {@link org.apache.cassandra.cache.ChunkCache}.
+ * Partially freed chunks are used to improve cache utilization and have lower priority compared to fully freed chunks.
+ *
+ * <p/>
+ *
+ * {@link LocalPool} is a thread local pool to serve buffer allocation requests. There are two kinds of local pool:
+ * <ul>
+ * <li>Normal Pool:
+ * <ul>
+ * <li>used to serve allocation size that is larger than half of NORMAL_ALLOCATION_UNIT but less than NORMAL_CHUNK_SIZE</li>
+ * <li>when there is insufficient space in the local queue, it will request global pool for more normal chunks</li>
+ * <li>when normal chunk is recycled either fully or partially, it will be passed to global pool to be used by other pools</li>
+ * </ul>
+ * </li>
+ * <li>Tiny Pool:
+ * <ul>
+ * <li>used to serve allocation size that is less than NORMAL_ALLOCATION_UNIT</li>
+ * <li>when there is insufficient space in the local queue, it will request parent normal pool for more tiny chunks</li>
+ * <li>when tiny chunk is fully freed, it will be passed to paretn normal pool and corresponding buffer in the parent normal chunk is freed</li>
+ * </ul>
+ * </li>
+ * </ul>
*
- * TODO: document the semantics of this class carefully
- * Notably: we do not automatically release from the local pool any chunk that has been incompletely allocated from
+ * Note: even though partially freed chunks improves cache utilization when chunk cache holds outstanding buffer for
+ * arbitrary period, there is still fragmentation in the partially freed chunk because of non-uniform allocation size.
*/
public class BufferPool
{
@@ -68,23 +112,40 @@ public class BufferPool
public static final int TINY_ALLOCATION_UNIT = TINY_CHUNK_SIZE / 64;
public static final int TINY_ALLOCATION_LIMIT = TINY_CHUNK_SIZE / 2;
- private final static BufferPoolMetrics metrics = new BufferPoolMetrics();
-
- // TODO: this should not be using FileCacheSizeInMB
- private static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
- private static String READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD);
-
- private static Debug debug;
-
private static final Logger logger = LoggerFactory.getLogger(BufferPool.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
+ private volatile Debug debug = Debug.NO_OP;
+
+ protected final String name;
+ protected final BufferPoolMetrics metrics;
+ private final long memoryUsageThreshold;
+ private final String readableMemoryUsageThreshold;
+
+ /**
+ * Size of unpooled buffer being allocated outside of buffer pool in bytes.
+ */
+ private final LongAdder overflowMemoryUsage = new LongAdder();
+
+ /**
+ * Size of buffer being used in bytes, including pooled buffer and unpooled buffer.
+ */
+ private final LongAdder memoryInUse = new LongAdder();
+
+ /**
+ * Size of allocated buffer pool slabs in bytes
+ */
+ private final AtomicLong memoryAllocated = new AtomicLong();
+
/** A global pool of chunks (page aligned buffers) */
- private static final GlobalPool globalPool = new GlobalPool();
+ private final GlobalPool globalPool;
+
+ /** Allow partially freed chunk to be recycled for allocation*/
+ private final boolean recyclePartially;
/** A thread local pool of chunks, where chunks come from the global pool */
- private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>()
+ private final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>()
{
@Override
protected LocalPool initialValue()
@@ -98,7 +159,31 @@ public class BufferPool
}
};
- public static ByteBuffer get(int size, BufferType bufferType)
+ private final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
+ private final InfiniteLoopExecutor localPoolCleaner;
+
+ public BufferPool(String name, long memoryUsageThreshold, boolean recyclePartially)
+ {
+ this.name = name;
+ this.memoryUsageThreshold = memoryUsageThreshold;
+ this.readableMemoryUsageThreshold = prettyPrintMemory(memoryUsageThreshold);
+ this.globalPool = new GlobalPool();
+ this.metrics = new BufferPoolMetrics(name, this);
+ this.recyclePartially = recyclePartially;
+ this.localPoolCleaner = new InfiniteLoopExecutor("LocalPool-Cleaner-" + name, this::cleanupOneReference).start();
+ }
+
+ /**
+ * @return a local pool instance and caller is responsible to release the pool
+ */
+ public LocalPool create()
+ {
+ return new LocalPool();
+ }
+
+ public ByteBuffer get(int size, BufferType bufferType)
{
if (bufferType == BufferType.ON_HEAP)
return allocate(size, bufferType);
@@ -106,7 +191,7 @@ public class BufferPool
return localPool.get().get(size);
}
- public static ByteBuffer getAtLeast(int size, BufferType bufferType)
+ public ByteBuffer getAtLeast(int size, BufferType bufferType)
{
if (bufferType == BufferType.ON_HEAP)
return allocate(size, bufferType);
@@ -115,30 +200,33 @@ public class BufferPool
}
/** Unlike the get methods, this will return null if the pool is exhausted */
- public static ByteBuffer tryGet(int size)
+ public ByteBuffer tryGet(int size)
{
return localPool.get().tryGet(size, false);
}
- public static ByteBuffer tryGetAtLeast(int size)
+ public ByteBuffer tryGetAtLeast(int size)
{
return localPool.get().tryGet(size, true);
}
- private static ByteBuffer allocate(int size, BufferType bufferType)
+ private ByteBuffer allocate(int size, BufferType bufferType)
{
+ updateOverflowMemoryUsage(size);
return bufferType == BufferType.ON_HEAP
? ByteBuffer.allocate(size)
: ByteBuffer.allocateDirect(size);
}
- public static void put(ByteBuffer buffer)
+ public void put(ByteBuffer buffer)
{
if (isExactlyDirect(buffer))
localPool.get().put(buffer);
+ else
+ updateOverflowMemoryUsage(-buffer.capacity());
}
- public static void putUnusedPortion(ByteBuffer buffer)
+ public void putUnusedPortion(ByteBuffer buffer)
{
if (isExactlyDirect(buffer))
{
@@ -150,43 +238,93 @@ public class BufferPool
}
}
- public static void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree)
+ private void updateOverflowMemoryUsage(int size)
+ {
+ overflowMemoryUsage.add(size);
+ }
+
+ public void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree)
{
localPool.get().recycleWhenFree(recycleWhenFree);
}
- public static long sizeInBytes()
+ /**
+ * @return buffer size being allocated, including pooled buffers and unpooled buffers
+ */
+ public long sizeInBytes()
{
- return globalPool.sizeInBytes();
+ return memoryAllocated.get() + overflowMemoryUsage.longValue();
}
- @VisibleForTesting
- public static void setMemoryUsageThreshold(long threshold)
+ /**
+ * @return buffer size being used, including used pooled buffers and unpooled buffers
+ */
+ public long usedSizeInBytes()
{
- MEMORY_USAGE_THRESHOLD = threshold;
- READABLE_MEMORY_USAGE_THRESHOLD = prettyPrintMemory(MEMORY_USAGE_THRESHOLD);
+ return memoryInUse.longValue() + overflowMemoryUsage.longValue();
+ }
+
+ /**
+ * @return unpooled buffer size being allocated outside of buffer pool.
+ */
+ public long overflowMemoryInBytes()
+ {
+ return overflowMemoryUsage.longValue();
+ }
+
+ /**
+ * @return maximum pooled buffer size in bytes
+ */
+ public long memoryUsageThreshold()
+ {
+ return memoryUsageThreshold;
}
@VisibleForTesting
- public static long getMemoryUsageThreshold()
+ public GlobalPool globalPool()
{
- return MEMORY_USAGE_THRESHOLD;
+ return globalPool;
}
interface Debug
{
+ public static Debug NO_OP = new Debug()
+ {
+ @Override
+ public void registerNormal(Chunk chunk) {}
+ @Override
+ public void recycleNormal(Chunk oldVersion, Chunk newVersion) {}
+ @Override
+ public void recyclePartial(Chunk chunk) { }
+ };
+
void registerNormal(Chunk chunk);
void recycleNormal(Chunk oldVersion, Chunk newVersion);
+ void recyclePartial(Chunk chunk);
}
- public static void debug(Debug setDebug)
+ public void debug(Debug setDebug)
{
- debug = setDebug;
+ assert setDebug != null;
+ this.debug = setDebug;
}
interface Recycler
{
+ /**
+ * Recycle a fully freed chunk
+ */
void recycle(Chunk chunk);
+
+ /**
+ * @return true if chunk can be reused before fully freed.
+ */
+ boolean canRecyclePartially();
+
+ /**
+ * Recycle a partially freed chunk
+ */
+ void recyclePartially(Chunk chunk);
}
/**
@@ -196,30 +334,32 @@ public class BufferPool
*
* This class is shared by multiple thread local pools and must be thread-safe.
*/
- static final class GlobalPool implements Supplier<Chunk>, Recycler
+ final class GlobalPool implements Supplier<Chunk>, Recycler
{
/** The size of a bigger chunk, 1 MiB, must be a multiple of NORMAL_CHUNK_SIZE */
static final int MACRO_CHUNK_SIZE = 64 * NORMAL_CHUNK_SIZE;
- private static final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE);
-
- static
- {
- assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2
- assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
- assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple
-
- logger.info("Global buffer pool limit is {}", prettyPrintMemory(MEMORY_USAGE_THRESHOLD));
- }
+ private final String READABLE_MACRO_CHUNK_SIZE = prettyPrintMemory(MACRO_CHUNK_SIZE);
private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>();
// TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage
+ // It contains fully free chunks and when it runs out, partially freed chunks will be used.
private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>();
- private final AtomicLong memoryUsage = new AtomicLong();
+ // Partially freed chunk which is recirculated whenever chunk has free spaces to
+ // improve buffer utilization when chunk cache is holding a piece of buffer for a long period.
+ // Note: fragmentation still exists, as holes are with different sizes.
+ private final Queue<Chunk> partiallyFreedChunks = new ConcurrentLinkedQueue<>();
/** Used in logging statements to lazily build a human-readable current memory usage. */
- private final Object readableMemoryUsage =
+ private final Object readableMemoryUsage =
new Object() { @Override public String toString() { return prettyPrintMemory(sizeInBytes()); } };
+ public GlobalPool()
+ {
+ assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2
+ assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
+ assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple
+ }
+
/** Return a chunk, the caller will take owership of the parent chunk. */
public Chunk get()
{
@@ -232,7 +372,10 @@ public class BufferPool
return chunk;
// another thread may have just allocated last macro chunk, so make one final attempt before returning null
- return chunks.poll();
+ chunk = chunks.poll();
+
+ // try to use partially freed chunk if there is no more fully freed chunk.
+ return chunk == null ? partiallyFreedChunks.poll() : chunk;
}
/**
@@ -243,17 +386,17 @@ public class BufferPool
{
while (true)
{
- long cur = memoryUsage.get();
- if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD)
+ long cur = memoryAllocated.get();
+ if (cur + MACRO_CHUNK_SIZE > memoryUsageThreshold)
{
- if (MEMORY_USAGE_THRESHOLD > 0)
+ if (memoryUsageThreshold > 0)
{
noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}",
- READABLE_MEMORY_USAGE_THRESHOLD, READABLE_MACRO_CHUNK_SIZE);
+ readableMemoryUsageThreshold, READABLE_MACRO_CHUNK_SIZE);
}
return null;
}
- if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
+ if (memoryAllocated.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
break;
}
@@ -265,10 +408,10 @@ public class BufferPool
}
catch (OutOfMemoryError oom)
{
- noSpamLogger.error("Buffer pool failed to allocate chunk of {}, current size {} ({}). " +
+ noSpamLogger.error("{} buffer pool failed to allocate chunk of {}, current size {} ({}). " +
"Attempting to continue; buffers will be allocated in on-heap memory which can degrade performance. " +
"Make sure direct memory size (-XX:MaxDirectMemorySize) is large enough to accommodate off-heap memtables and caches.",
- READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName());
+ name, READABLE_MACRO_CHUNK_SIZE, readableMemoryUsage, oom.getClass().getName());
return null;
}
@@ -276,29 +419,35 @@ public class BufferPool
macroChunks.add(chunk);
final Chunk callerChunk = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE));
- if (debug != null)
- debug.registerNormal(callerChunk);
+ debug.registerNormal(callerChunk);
for (int i = NORMAL_CHUNK_SIZE; i < MACRO_CHUNK_SIZE; i += NORMAL_CHUNK_SIZE)
{
Chunk add = new Chunk(this, chunk.get(NORMAL_CHUNK_SIZE));
chunks.add(add);
- if (debug != null)
- debug.registerNormal(add);
+ debug.registerNormal(add);
}
return callerChunk;
}
+ @Override
public void recycle(Chunk chunk)
{
Chunk recycleAs = new Chunk(chunk);
- if (debug != null)
- debug.recycleNormal(chunk, recycleAs);
+ debug.recycleNormal(chunk, recycleAs);
chunks.add(recycleAs);
}
- public long sizeInBytes()
+ @Override
+ public void recyclePartially(Chunk chunk)
{
- return memoryUsage.get();
+ debug.recyclePartial(chunk);
+ partiallyFreedChunks.add(chunk);
+ }
+
+ @Override
+ public boolean canRecyclePartially()
+ {
+ return recyclePartially;
}
/** This is not thread safe and should only be used for unit testing. */
@@ -308,10 +457,23 @@ public class BufferPool
while (!chunks.isEmpty())
chunks.poll().unsafeFree();
+ while (!partiallyFreedChunks.isEmpty())
+ partiallyFreedChunks.poll().unsafeFree();
+
while (!macroChunks.isEmpty())
macroChunks.poll().unsafeFree();
+ }
+
+ @VisibleForTesting
+ boolean isPartiallyFreed(Chunk chunk)
+ {
+ return partiallyFreedChunks.contains(chunk);
+ }
- memoryUsage.set(0);
+ @VisibleForTesting
+ boolean isFullyFreed(Chunk chunk)
+ {
+ return chunks.contains(chunk);
}
}
@@ -545,7 +707,7 @@ public class BufferPool
* A thread local class that grabs chunks from the global pool for this thread allocations.
* Only one thread can do the allocations but multiple threads can release the allocations.
*/
- public static final class LocalPool implements Recycler
+ public final class LocalPool implements Recycler
{
private final Queue<ByteBuffer> reuseObjects;
private final Supplier<Chunk> parent;
@@ -575,9 +737,7 @@ public class BufferPool
{
this.parent = () -> {
ByteBuffer buffer = parent.tryGetInternal(TINY_CHUNK_SIZE, false);
- if (buffer == null)
- return null;
- return new Chunk(parent, buffer);
+ return buffer == null ? null : new Chunk(parent, buffer);
};
this.tinyLimit = 0; // we only currently permit one layer of nesting (which brings us down to 32 byte allocations, so is plenty)
this.reuseObjects = parent.reuseObjects; // we share the same ByteBuffer object reuse pool, as we both have the same exclusive access to it
@@ -594,13 +754,21 @@ public class BufferPool
public void put(ByteBuffer buffer)
{
Chunk chunk = Chunk.getParentChunk(buffer);
+ int size = buffer.capacity();
+
if (chunk == null)
+ {
FileUtils.clean(buffer);
+ updateOverflowMemoryUsage(-size);
+ }
else
+ {
put(buffer, chunk);
+ memoryInUse.add(-size);
+ }
}
- public void put(ByteBuffer buffer, Chunk chunk)
+ private void put(ByteBuffer buffer, Chunk chunk)
{
LocalPool owner = chunk.owner;
if (owner != null && owner == tinyPool)
@@ -609,23 +777,39 @@ public class BufferPool
return;
}
- // ask the free method to take exclusive ownership of the act of recycling
- // if we are either: already not owned by anyone, or owned by ourselves
- long free = chunk.free(buffer, owner == null || (owner == this && recycleWhenFree));
+ // ask the free method to take exclusive ownership of the act of recycling if chunk is owned by ourselves
+ long free = chunk.free(buffer, owner == this && recycleWhenFree);
+ // free:
+ // * 0L: current pool must be the owner. we can fully recyle the chunk.
+ // * -1L:
+ // * for normal chunk:
+ // a) if it has owner, do nothing.
+ // b) if it not owner, try to recyle it either fully or partially if not already recyled.
+ // * for tiny chunk:
+ // a) if it has owner, do nothing.
+ // b) if it has not owner, recycle the tiny chunk back to parent chunk
+ // * others:
+ // * for normal chunk: partial recycle the chunk if it can be partially recycled but not yet recycled.
+ // * for tiny chunk: do nothing.
if (free == 0L)
{
+ assert owner == this;
// 0L => we own recycling responsibility, so must recycle;
- // if we are the owner, we must remove the Chunk from our local queue
- if (owner == this)
- remove(chunk);
+ // We must remove the Chunk from our local queue
+ remove(chunk);
chunk.recycle();
}
- else if (((free == -1L) && owner != this) && chunk.owner == null)
+ else if (free == -1L && owner != this && chunk.owner == null && !chunk.recycler.canRecyclePartially())
{
// although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset.
// we must also check after completely freeing if the owner has since been unset, and try to recycle
chunk.tryRecycle();
}
+ else if (chunk.owner == null && chunk.recycler.canRecyclePartially() && chunk.setInUse(Chunk.Status.EVICTED))
+ {
+ // re-cirlate partially freed normal chunk to global list
+ chunk.partiallyRecycle();
+ }
if (owner == this)
{
@@ -638,10 +822,16 @@ public class BufferPool
public void putUnusedPortion(ByteBuffer buffer)
{
Chunk chunk = Chunk.getParentChunk(buffer);
+ int size = buffer.capacity() - buffer.limit();
+
if (chunk == null)
+ {
+ updateOverflowMemoryUsage(-size);
return;
+ }
chunk.freeUnusedPortion(buffer);
+ memoryInUse.add(-size);
}
public ByteBuffer get(int size)
@@ -658,7 +848,11 @@ public class BufferPool
{
ByteBuffer ret = tryGet(size, sizeIsLowerBound);
if (ret != null)
+ {
+ metrics.hits.mark();
+ memoryInUse.add(ret.capacity());
return ret;
+ }
if (size > NORMAL_CHUNK_SIZE)
{
@@ -677,16 +871,6 @@ public class BufferPool
return allocate(size, BufferType.OFF_HEAP);
}
- public ByteBuffer tryGet(int size)
- {
- return tryGet(size, false);
- }
-
- public ByteBuffer tryGetAtLeast(int size)
- {
- return tryGet(size, true);
- }
-
private ByteBuffer tryGet(int size, boolean sizeIsLowerBound)
{
LocalPool pool = this;
@@ -715,7 +899,9 @@ public class BufferPool
ByteBuffer reuse = this.reuseObjects.poll();
ByteBuffer buffer = chunks.get(size, sizeIsLowerBound, reuse);
if (buffer != null)
+ {
return buffer;
+ }
// else ask the global pool
Chunk chunk = addChunkFromParent();
@@ -731,7 +917,8 @@ public class BufferPool
return null;
}
- // recycle
+ // recycle entire tiny chunk from tiny pool back to local pool
+ @Override
public void recycle(Chunk chunk)
{
ByteBuffer buffer = chunk.slab;
@@ -739,6 +926,20 @@ public class BufferPool
put(buffer, parentChunk);
}
+ @Override
+ public void recyclePartially(Chunk chunk)
+ {
+ throw new UnsupportedOperationException("Tiny chunk doesn't support partial recycle.");
+ }
+
+ @Override
+ public boolean canRecyclePartially()
+ {
+ // tiny pool doesn't support partial recycle, as we want to have tiny chunk fully freed and put back to
+ // parent normal chunk.
+ return false;
+ }
+
private void remove(Chunk chunk)
{
chunks.remove(chunk);
@@ -763,8 +964,11 @@ public class BufferPool
if (evict != null)
{
if (tinyPool != null)
+ // releasing tiny chunks may result in releasing current evicted chunk
tinyPool.chunks.removeIf((child, parent) -> Chunk.getParentChunk(child.slab) == parent, evict);
evict.release();
+ // Mark it as evicted and will be eligible for partial recyle if recycler allows
+ evict.setEvicted(Chunk.Status.IN_USE);
}
}
@@ -784,6 +988,12 @@ public class BufferPool
chunks.unsafeRecycle();
}
+ @VisibleForTesting
+ public boolean isTinyPool()
+ {
+ return !(parent instanceof GlobalPool);
+ }
+
public LocalPool recycleWhenFree(boolean recycleWhenFree)
{
this.recycleWhenFree = recycleWhenFree;
@@ -808,12 +1018,7 @@ public class BufferPool
}
}
- private static final Set<LocalPoolRef> localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
- private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
-
- private static void cleanupOneReference() throws InterruptedException
+ private void cleanupOneReference() throws InterruptedException
{
Object obj = localPoolRefQueue.remove(100);
if (obj instanceof LocalPoolRef)
@@ -865,10 +1070,19 @@ public class BufferPool
*/
final static class Chunk
{
+ enum Status
+ {
+ /** The slab is serving or ready to serve requests */
+ IN_USE,
+ /** The slab is not serving requests and ready for partial recycle*/
+ EVICTED;
+ }
+
private final ByteBuffer slab;
- private final long baseAddress;
+ final long baseAddress;
private final int shift;
+ // it may be 0L when all slots are allocated after "get" or when all slots are freed after "free"
private volatile long freeSlots;
private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots");
@@ -878,6 +1092,10 @@ public class BufferPool
private volatile LocalPool owner;
private final Recycler recycler;
+ private static final AtomicReferenceFieldUpdater<Chunk, Status> statusUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(Chunk.class, Status.class, "status");
+ private volatile Status status = Status.IN_USE;
+
@VisibleForTesting
Object debugAttachment;
@@ -939,6 +1157,12 @@ public class BufferPool
recycler.recycle(this);
}
+ public void partiallyRecycle()
+ {
+ assert owner == null;
+ recycler.recyclePartially(this);
+ }
+
/**
* We stash the chunk in the attachment of a buffer
* that was returned by get(), this method simply
@@ -1182,6 +1406,12 @@ public class BufferPool
}
@VisibleForTesting
+ public LocalPool owner()
+ {
+ return this.owner;
+ }
+
+ @VisibleForTesting
void unsafeFree()
{
Chunk parent = getParentChunk(slab);
@@ -1200,6 +1430,26 @@ public class BufferPool
chunk.recycle();
}
}
+
+ Status status()
+ {
+ return status;
+ }
+
+ private boolean setStatus(Status current, Status update)
+ {
+ return statusUpdater.compareAndSet(this, current, update);
+ }
+
+ boolean setInUse(Status prev)
+ {
+ return setStatus(prev, Status.IN_USE);
+ }
+
+ boolean setEvicted(Status prev)
+ {
+ return setStatus(prev, Status.EVICTED);
+ }
}
@VisibleForTesting
@@ -1218,49 +1468,41 @@ public class BufferPool
}
@VisibleForTesting
- public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+ public void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
- shutdownNow(of(EXEC));
- awaitTermination(timeout, unit, of(EXEC));
+ shutdownNow(of(localPoolCleaner));
+ awaitTermination(timeout, unit, of(localPoolCleaner));
}
- public static long unsafeGetBytesInUse()
+ @VisibleForTesting
+ public BufferPoolMetrics metrics()
{
- long totalMemory = globalPool.memoryUsage.get();
- class L { long v; }
- final L availableMemory = new L();
- for (Chunk chunk : globalPool.chunks)
- {
- availableMemory.v += chunk.capacity();
- }
- for (LocalPoolRef ref : localPoolReferences)
- {
- ref.chunks.forEach(chunk -> availableMemory.v += chunk.free());
- }
- return totalMemory - availableMemory.v;
+ return metrics;
}
/** This is not thread safe and should only be used for unit testing. */
@VisibleForTesting
- static void unsafeReset()
+ public void unsafeReset()
{
+ overflowMemoryUsage.reset();
+ memoryInUse.reset();
+ memoryAllocated.set(0);
localPool.get().unsafeRecycle();
globalPool.unsafeFree();
}
@VisibleForTesting
- static Chunk unsafeCurrentChunk()
+ Chunk unsafeCurrentChunk()
{
return localPool.get().chunks.chunk0;
}
@VisibleForTesting
- static int unsafeNumChunks()
+ int unsafeNumChunks()
{
LocalPool pool = localPool.get();
return (pool.chunks.chunk0 != null ? 1 : 0)
+ (pool.chunks.chunk1 != null ? 1 : 0)
+ (pool.chunks.chunk2 != null ? 1 : 0);
}
-
}
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPools.java b/src/java/org/apache/cassandra/utils/memory/BufferPools.java
new file mode 100644
index 0000000..736e1cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPools.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BufferPools
+{
+ private static final Logger logger = LoggerFactory.getLogger(BufferPools.class);
+
+ /**
+ * Used by chunk cache to store decompressed data and buffers may be held by chunk cache for arbitrary period.
+ */
+ private static final long FILE_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
+ private static final BufferPool CHUNK_CACHE_POOL = new BufferPool("chunk-cache", FILE_MEMORY_USAGE_THRESHOLD, true);
+
+ /**
+ * Used by client-server or inter-node requests, buffers should be released immediately after use.
+ */
+ private static final long NETWORKING_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getNetworkingCacheSizeInMB() * 1024L * 1024L;
+ private static final BufferPool NETWORKING_POOL = new BufferPool("networking", NETWORKING_MEMORY_USAGE_THRESHOLD, false);
+
+ static
+ {
+ logger.info("Global buffer pool limit is {} for {} and {} for {}",
+ prettyPrintMemory(FILE_MEMORY_USAGE_THRESHOLD),
+ CHUNK_CACHE_POOL.name,
+ prettyPrintMemory(NETWORKING_MEMORY_USAGE_THRESHOLD),
+ NETWORKING_POOL.name);
+
+ CHUNK_CACHE_POOL.metrics().register3xAlias();
+ }
+ /**
+ * Long-lived buffers used for chunk cache and other disk access
+ */
+ public static BufferPool forChunkCache()
+ {
+ return CHUNK_CACHE_POOL;
+ }
+
+ /**
+ * Short-lived buffers used for internode messaging or client-server connections.
+ */
+ public static BufferPool forNetworking()
+ {
+ return NETWORKING_POOL;
+ }
+
+ public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException
+ {
+ CHUNK_CACHE_POOL.shutdownLocalCleaner(timeout, unit);
+ NETWORKING_POOL.shutdownLocalCleaner(timeout, unit);
+ }
+
+}
diff --git a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
index a421f3e..dea552c 100644
--- a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
+++ b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java
@@ -59,7 +59,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageGenerator.UniformPayloadGenerator;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MonotonicClock;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.lang.Math.min;
import static org.apache.cassandra.net.MessagingService.current_version;
@@ -435,7 +435,7 @@ public class ConnectionBurnTest
checkStoppedTo .accept(endpoint, getConnections(endpoint, true ));
checkStoppedFrom.accept(endpoint, getConnections(endpoint, false));
}
- long inUse = BufferPool.unsafeGetBytesInUse();
+ long inUse = BufferPools.forNetworking().usedSizeInBytes();
if (inUse > 0)
{
// try
diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
index 0af4199..fc603c9 100644
--- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -84,7 +85,7 @@ public class LongBufferPoolTest
}
long recycleRound = 1;
final List<BufferPool.Chunk> normalChunks = new ArrayList<>();
- final List<BufferPool.Chunk> tinyChunks = new ArrayList<>();
+
public synchronized void registerNormal(BufferPool.Chunk chunk)
{
chunk.debugAttachment = new DebugChunk();
@@ -95,13 +96,14 @@ public class LongBufferPoolTest
newVersion.debugAttachment = oldVersion.debugAttachment;
DebugChunk.get(oldVersion).lastRecycled = recycleRound;
}
+ public void recyclePartial(BufferPool.Chunk chunk)
+ {
+ DebugChunk.get(chunk).lastRecycled = recycleRound;
+ }
public synchronized void check()
{
-// for (BufferPool.Chunk chunk : tinyChunks)
-// assert DebugChunk.get(chunk).lastRecycled == recycleRound;
for (BufferPool.Chunk chunk : normalChunks)
assert DebugChunk.get(chunk).lastRecycled == recycleRound;
- tinyChunks.clear(); // they don't survive a recycleRound
recycleRound++;
}
}
@@ -112,10 +114,29 @@ public class LongBufferPoolTest
DatabaseDescriptor.daemonInitialization();
}
+ @AfterClass
+ public static void teardown()
+ {
+ BufferPools.forChunkCache().unsafeReset();
+ BufferPools.forNetworking().unsafeReset();
+ }
+
@Test
- public void testAllocate() throws InterruptedException, ExecutionException
+ public void testPoolAllocateWithRecyclePartially() throws InterruptedException, ExecutionException
+ {
+ testPoolAllocate(true);
+ }
+
+ @Test
+ public void testPoolAllocateWithoutRecyclePartially() throws InterruptedException, ExecutionException
+ {
+ testPoolAllocate(false);
+ }
+
+ private void testPoolAllocate(boolean recyclePartially) throws InterruptedException, ExecutionException
{
- testAllocate(Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L), 16 << 20);
+ BufferPool pool = new BufferPool("test_pool", 16 << 20, recyclePartially);
+ testAllocate(pool, Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L));
}
private static final class BufferCheck
@@ -160,11 +181,11 @@ public class LongBufferPoolTest
final List<Future<Boolean>> threadResultFuture;
final int targetSizeQuanta;
- TestEnvironment(int threadCount, long duration, int poolSize)
+ TestEnvironment(int threadCount, long duration, long poolSize)
{
this.threadCount = threadCount;
this.duration = duration;
- this.poolSize = poolSize;
+ this.poolSize = Math.toIntExact(poolSize);
until = System.nanoTime() + duration;
latch = new CountDownLatch(threadCount);
sharedRecycle = new SPSCQueue[threadCount];
@@ -188,7 +209,7 @@ public class LongBufferPoolTest
//
// This should divide double the poolSize across the working threads,
// plus NORMAL_CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair.
- targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1);
+ targetSizeQuanta = 2 * this.poolSize / sum1toN(threadCount - 1);
}
void addCheckedFuture(Future<Boolean> future)
@@ -238,24 +259,19 @@ public class LongBufferPoolTest
}
}
- public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException
+ public void testAllocate(BufferPool bufferPool, int threadCount, long duration) throws InterruptedException, ExecutionException
{
- System.out.println(String.format("%s - testing %d threads for %dm",
- DATE_FORMAT.format(new Date()),
- threadCount,
- TimeUnit.NANOSECONDS.toMinutes(duration)));
- long prevPoolSize = BufferPool.getMemoryUsageThreshold();
- logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize);
- BufferPool.setMemoryUsageThreshold(poolSize);
+ logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration));
+ logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold());
Debug debug = new Debug();
- BufferPool.debug(debug);
+ bufferPool.debug(debug);
- TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize);
+ TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold());
- startBurnerThreads(testEnv);
+ startBurnerThreads(bufferPool, testEnv);
for (int threadIdx = 0; threadIdx < threadCount; threadIdx++)
- testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx));
+ testEnv.addCheckedFuture(startWorkerThread(bufferPool, testEnv, threadIdx));
while (!testEnv.latch.await(10L, TimeUnit.SECONDS))
{
@@ -281,25 +297,23 @@ public class LongBufferPoolTest
while ( null != (check = queue.poll()) )
{
check.validate();
- BufferPool.put(check.buffer);
+ bufferPool.put(check.buffer);
}
}
assertEquals(0, testEnv.executorService.shutdownNow().size());
- logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize);
- BufferPool.setMemoryUsageThreshold(prevPoolSize);
- BufferPool.debug(null);
+ logger.info("Reverting BufferPool DEBUG config");
+ bufferPool.debug(BufferPool.Debug.NO_OP);
testEnv.assertCheckedThreadsSucceeded();
- System.out.println(String.format("%s - finished.",
- DATE_FORMAT.format(new Date())));
+ logger.info("{} - finished.", DATE_FORMAT.format(new Date()));
}
- private Future<Boolean> startWorkerThread(TestEnvironment testEnv, final int threadIdx)
+ private Future<Boolean> startWorkerThread(BufferPool bufferPool, TestEnvironment testEnv, final int threadIdx)
{
- return testEnv.executorService.submit(new TestUntil(testEnv.until)
+ return testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
final int targetSize = threadIdx == 0 ? BufferPool.NORMAL_CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx;
final SPSCQueue<BufferCheck> shareFrom = testEnv.sharedRecycle[threadIdx];
@@ -362,7 +376,7 @@ public class LongBufferPoolTest
else
{
check.validate();
- BufferPool.put(check.buffer);
+ bufferPool.put(check.buffer);
totalSize -= size;
}
}
@@ -407,7 +421,7 @@ public class LongBufferPoolTest
while (checks.size() > 0)
{
BufferCheck check = checks.get(0);
- BufferPool.put(check.buffer);
+ bufferPool.put(check.buffer);
checks.remove(check.listnode);
}
testEnv.latch.countDown();
@@ -419,13 +433,13 @@ public class LongBufferPoolTest
if (check == null)
return false;
check.validate();
- BufferPool.put(check.buffer);
+ bufferPool.put(check.buffer);
return true;
}
BufferCheck allocate(int size)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
BufferCheck check = new BufferCheck(buffer, rand.nextLong());
assertEquals(size, buffer.capacity());
@@ -470,12 +484,12 @@ public class LongBufferPoolTest
});
}
- private void startBurnerThreads(TestEnvironment testEnv)
+ private void startBurnerThreads(BufferPool bufferPool, TestEnvironment testEnv)
{
// setup some high churn allocate/deallocate, without any checking
final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
final CountDownLatch doneAdd = new CountDownLatch(1);
- testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(testEnv.until)
+ testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
int count = 0;
final ThreadLocalRandom rand = ThreadLocalRandom.current();
@@ -494,8 +508,9 @@ public class LongBufferPoolTest
return;
}
- ByteBuffer buffer = rand.nextInt(4) < 1 ? BufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE)
- : BufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT);
+ ByteBuffer buffer = rand.nextInt(4) < 1
+ ? bufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE)
+ : bufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT);
if (buffer == null)
{
Thread.yield();
@@ -505,7 +520,7 @@ public class LongBufferPoolTest
// 50/50 chance of returning the buffer from the producer thread, or
// pass it on to the consumer.
if (rand.nextBoolean())
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
else
burn.add(buffer);
@@ -516,7 +531,7 @@ public class LongBufferPoolTest
doneAdd.countDown();
}
}));
- testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(testEnv.until)
+ testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(bufferPool, testEnv.until)
{
void testOne() throws Exception
{
@@ -526,7 +541,7 @@ public class LongBufferPoolTest
Thread.yield();
return;
}
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
void cleanup()
{
@@ -537,9 +552,11 @@ public class LongBufferPoolTest
static abstract class TestUntil implements Callable<Boolean>
{
+ final BufferPool bufferPool;
final long until;
- protected TestUntil(long until)
+ protected TestUntil(BufferPool bufferPool, long until)
{
+ this.bufferPool = bufferPool;
this.until = until;
}
@@ -562,7 +579,7 @@ public class LongBufferPoolTest
{
logger.error("Got exception {}, current chunk {}",
ex.getMessage(),
- BufferPool.unsafeCurrentChunk());
+ bufferPool.unsafeCurrentChunk());
ex.printStackTrace();
return false;
}
@@ -570,7 +587,7 @@ public class LongBufferPoolTest
{
logger.error("Got throwable {}, current chunk {}",
tr.getMessage(),
- BufferPool.unsafeCurrentChunk());
+ bufferPool.unsafeCurrentChunk());
tr.printStackTrace();
return false;
}
@@ -587,14 +604,14 @@ public class LongBufferPoolTest
try
{
LongBufferPoolTest.setup();
- new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(),
- TimeUnit.HOURS.toNanos(2L), 16 << 20);
+ new LongBufferPoolTest().testAllocate(new BufferPool("test_pool", 16 << 20, true),
+ Runtime.getRuntime().availableProcessors(),
+ TimeUnit.HOURS.toNanos(2L));
System.exit(0);
}
catch (Throwable tr)
{
- System.out.println(String.format("Test failed - %s", tr.getMessage()));
- tr.printStackTrace();
+ logger.error("Test failed - {}", tr.getMessage(), tr);
System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure
}
}
diff --git a/test/data/jmxdump/cassandra-4.0-jmx.yaml b/test/data/jmxdump/cassandra-4.0-jmx.yaml
index a013ebc..05f4773 100644
--- a/test/data/jmxdump/cassandra-4.0-jmx.yaml
+++ b/test/data/jmxdump/cassandra-4.0-jmx.yaml
@@ -7155,7 +7155,7 @@ org.apache.cassandra.internal:type=ViewBuildExecutor:
- {access: read/write, name: MaximumPoolSize, type: int}
- {access: read/write, name: MaximumThreads, type: int}
operations: []
-org.apache.cassandra.metrics:type=BufferPool,name=Misses:
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Misses:
attributes:
- {access: read-only, name: Count, type: long}
- {access: read-only, name: FifteenMinuteRate, type: double}
@@ -7167,7 +7167,78 @@ org.apache.cassandra.metrics:type=BufferPool,name=Misses:
- name: objectName
parameters: []
returnType: javax.management.ObjectName
-org.apache.cassandra.metrics:type=BufferPool,name=Size:
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Hits:
+ attributes:
+ - {access: read-only, name: Count, type: long}
+ - {access: read-only, name: FifteenMinuteRate, type: double}
+ - {access: read-only, name: FiveMinuteRate, type: double}
+ - {access: read-only, name: MeanRate, type: double}
+ - {access: read-only, name: OneMinuteRate, type: double}
+ - {access: read-only, name: RateUnit, type: java.lang.String}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=Size:
+ attributes:
+ - {access: read-only, name: Value, type: java.lang.Object}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=UsedSize:
+ attributes:
+ - {access: read-only, name: Value, type: java.lang.Object}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=chunk-cache,name=OverflowSize:
+ attributes:
+ - {access: read-only, name: Value, type: java.lang.Object}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Misses:
+ attributes:
+ - {access: read-only, name: Count, type: long}
+ - {access: read-only, name: FifteenMinuteRate, type: double}
+ - {access: read-only, name: FiveMinuteRate, type: double}
+ - {access: read-only, name: MeanRate, type: double}
+ - {access: read-only, name: OneMinuteRate, type: double}
+ - {access: read-only, name: RateUnit, type: java.lang.String}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Hits:
+ attributes:
+ - {access: read-only, name: Count, type: long}
+ - {access: read-only, name: FifteenMinuteRate, type: double}
+ - {access: read-only, name: FiveMinuteRate, type: double}
+ - {access: read-only, name: MeanRate, type: double}
+ - {access: read-only, name: OneMinuteRate, type: double}
+ - {access: read-only, name: RateUnit, type: java.lang.String}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=Size:
+ attributes:
+ - {access: read-only, name: Value, type: java.lang.Object}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=UsedSize:
+ attributes:
+ - {access: read-only, name: Value, type: java.lang.Object}
+ operations:
+ - name: objectName
+ parameters: []
+ returnType: javax.management.ObjectName
+org.apache.cassandra.metrics:type=BufferPool,scope=networking,name=OverflowSize:
attributes:
- {access: read-only, name: Value, type: java.lang.Object}
operations:
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 51c58d4..9a3fd08 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -120,8 +120,8 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
+import org.apache.cassandra.utils.memory.BufferPools;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -617,7 +617,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
() -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
() -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
() -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
- () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
+ () -> BufferPools.shutdownLocalCleaner(1L, MINUTES),
() -> Ref.shutdownReferenceReaper(1L, MINUTES),
() -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
() -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
diff --git a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
index 313aa3f..5e0286f 100644
--- a/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/BufferPoolMetricsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.metrics;
import java.util.Random;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -31,7 +30,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.memory.BufferPool;
-import org.apache.cassandra.utils.memory.BufferPoolTest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -40,7 +38,8 @@ import static org.junit.Assert.assertEquals;
@RunWith(OrderedJUnit4ClassRunner.class)
public class BufferPoolMetricsTest
{
- private static final BufferPoolMetrics metrics = new BufferPoolMetrics();
+ private BufferPool bufferPool;
+ private BufferPoolMetrics metrics;
@BeforeClass()
public static void setup() throws ConfigurationException
@@ -51,21 +50,15 @@ public class BufferPoolMetricsTest
@Before
public void setUp()
{
- BufferPool.setMemoryUsageThreshold(16 * 1024L * 1024L);
- }
-
- @After
- public void cleanUp()
- {
- BufferPoolTest.resetBufferPool();
- metrics.misses.mark(metrics.misses.getCount() * -1);
+ this.bufferPool = new BufferPool("test_" + System.currentTimeMillis(), 16 * 1024L * 1024L, true);
+ this.metrics = bufferPool.metrics();
}
@Test
public void testMetricsSize()
{
// basically want to test changes in the metric being reported as the buffer pool grows - starts at zero
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isEqualTo(0);
// the idea is to test changes in the sizeOfBufferPool metric which starts at zero. it will bump up
@@ -77,7 +70,7 @@ public class BufferPoolMetricsTest
final long seed = System.currentTimeMillis();
final Random rand = new Random(seed);
final String assertionMessage = String.format("Failed with seed of %s", seed);
- final long maxIterations = BufferPool.getMemoryUsageThreshold();
+ final long maxIterations = bufferPool.memoryUsageThreshold();
final int maxBufferSize = BufferPool.NORMAL_CHUNK_SIZE - 1;
int nextSizeToRequest;
long totalBytesRequestedFromPool = 0;
@@ -87,15 +80,15 @@ public class BufferPoolMetricsTest
{
nextSizeToRequest = rand.nextInt(maxBufferSize) + 1;
totalBytesRequestedFromPool = totalBytesRequestedFromPool + nextSizeToRequest;
- BufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP);
+ bufferPool.get(nextSizeToRequest, BufferType.OFF_HEAP);
assertThat(metrics.size.getValue()).as(assertionMessage)
- .isEqualTo(BufferPool.sizeInBytes())
+ .isEqualTo(bufferPool.sizeInBytes())
.isGreaterThanOrEqualTo(totalBytesRequestedFromPool);
if (initialSizeInBytesAfterZero == 0)
{
- initialSizeInBytesAfterZero = BufferPool.sizeInBytes();
+ initialSizeInBytesAfterZero = bufferPool.sizeInBytes();
}
else
{
@@ -115,6 +108,74 @@ public class BufferPoolMetricsTest
}
@Test
+ public void testMetricsOverflowSize()
+ {
+ assertEquals(0, metrics.overflowSize.getValue().longValue());
+
+ final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+ final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+ int iterations = 16;
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+ assertEquals(0, metrics.overflowSize.getValue().longValue());
+ }
+
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP);
+ assertEquals(bigBufferSizeThatMisses * (ix + 1), metrics.overflowSize.getValue().longValue());
+ }
+ }
+
+ @Test
+ public void testMetricsUsedSize()
+ {
+ assertEquals(0, metrics.usedSize.getValue().longValue());
+
+ final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+ final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+ long usedSize = 0;
+ int iterations = 16;
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+ assertEquals(usedSize += tinyBufferSizeThatHits, metrics.usedSize.getValue().longValue());
+ }
+
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(bigBufferSizeThatMisses, BufferType.OFF_HEAP);
+ assertEquals(usedSize += bigBufferSizeThatMisses, metrics.usedSize.getValue().longValue());
+ }
+ }
+
+ @Test
+ public void testMetricsHits()
+ {
+ assertEquals(0, metrics.hits.getCount());
+
+ final int tinyBufferSizeThatHits = BufferPool.NORMAL_CHUNK_SIZE - 1;
+ final int bigBufferSizeThatMisses = BufferPool.NORMAL_CHUNK_SIZE + 1;
+
+ int iterations = 16;
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+ assertEquals(ix + 1, metrics.hits.getCount());
+ }
+
+ long currentHits = metrics.hits.getCount();
+ for (int ix = 0; ix < iterations; ix++)
+ {
+ bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
+ assertEquals(currentHits, metrics.hits.getCount());
+ }
+ }
+
+ @Test
public void testMetricsMisses()
{
assertEquals(0, metrics.misses.getCount());
@@ -125,13 +186,13 @@ public class BufferPoolMetricsTest
int iterations = 16;
for (int ix = 0; ix < iterations; ix++)
{
- BufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
+ bufferPool.get(tinyBufferSizeThatHits, BufferType.OFF_HEAP);
assertEquals(0, metrics.misses.getCount());
}
for (int ix = 0; ix < iterations; ix++)
{
- BufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
+ bufferPool.get(bigBufferSizeThatMisses + ix, BufferType.OFF_HEAP);
assertEquals(ix + 1, metrics.misses.getCount());
}
}
@@ -140,23 +201,23 @@ public class BufferPoolMetricsTest
public void testZeroSizeRequestsDontChangeMetrics()
{
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isEqualTo(0);
- BufferPool.get(0, BufferType.OFF_HEAP);
+ bufferPool.get(0, BufferType.OFF_HEAP);
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isEqualTo(0);
- BufferPool.get(65536, BufferType.OFF_HEAP);
- BufferPool.get(0, BufferType.OFF_HEAP);
- BufferPool.get(0, BufferType.OFF_HEAP);
- BufferPool.get(0, BufferType.OFF_HEAP);
- BufferPool.get(0, BufferType.OFF_HEAP);
+ bufferPool.get(65536, BufferType.OFF_HEAP);
+ bufferPool.get(0, BufferType.OFF_HEAP);
+ bufferPool.get(0, BufferType.OFF_HEAP);
+ bufferPool.get(0, BufferType.OFF_HEAP);
+ bufferPool.get(0, BufferType.OFF_HEAP);
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isGreaterThanOrEqualTo(65536);
}
@@ -164,29 +225,29 @@ public class BufferPoolMetricsTest
public void testFailedRequestsDontChangeMetrics()
{
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isEqualTo(0);
tryRequestNegativeBufferSize();
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isEqualTo(0);
- BufferPool.get(65536, BufferType.OFF_HEAP);
+ bufferPool.get(65536, BufferType.OFF_HEAP);
tryRequestNegativeBufferSize();
tryRequestNegativeBufferSize();
tryRequestNegativeBufferSize();
tryRequestNegativeBufferSize();
assertEquals(0, metrics.misses.getCount());
- assertThat(metrics.size.getValue()).isEqualTo(BufferPool.sizeInBytes())
+ assertThat(metrics.size.getValue()).isEqualTo(bufferPool.sizeInBytes())
.isGreaterThanOrEqualTo(65536);
}
private void tryRequestNegativeBufferSize()
{
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(
- () -> BufferPool.get(-1, BufferType.OFF_HEAP));
+ () -> bufferPool.get(-1, BufferType.OFF_HEAP));
}
}
diff --git a/test/unit/org/apache/cassandra/net/FramingTest.java b/test/unit/org/apache/cassandra/net/FramingTest.java
index 8a7f428..27c8003 100644
--- a/test/unit/org/apache/cassandra/net/FramingTest.java
+++ b/test/unit/org/apache/cassandra/net/FramingTest.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.memory.BufferPools;
import org.apache.cassandra.utils.vint.VIntCoding;
import static java.lang.Math.*;
@@ -197,7 +197,7 @@ public class FramingTest
cumulativeCompressedLength[i] = (i == 0 ? 0 : cumulativeCompressedLength[i - 1]) + buffer.readableBytes();
}
- ByteBuffer frames = BufferPool.getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP);
+ ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP);
for (ByteBuf buffer : compressed)
{
frames.put(buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes()));
@@ -412,7 +412,7 @@ public class FramingTest
cumulativeLength[i] = (i == 0 ? 0 : cumulativeLength[i - 1]) + message.length;
}
- ByteBuffer frames = BufferPool.getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP);
+ ByteBuffer frames = BufferPools.forNetworking().getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP);
for (byte[] buffer : messages)
frames.put(buffer);
frames.flip();
diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
index 4bfc54a..eb3cc1b 100644
--- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
@@ -23,13 +23,11 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
-import org.junit.After;
+import com.google.common.collect.Iterables;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -37,31 +35,12 @@ import static org.junit.Assert.*;
public class BufferPoolTest
{
- @BeforeClass
- public static void setupDD()
- {
- DatabaseDescriptor.daemonInitialization();
- }
+ private BufferPool bufferPool;
@Before
public void setUp()
{
- BufferPool.setMemoryUsageThreshold(8 * 1024L * 1024L);
- }
-
- @After
- public void cleanUp()
- {
- resetBufferPool();
- }
-
- /**
- * Exposes a utility method on this test that other tests might use to access the protected
- * {@link BufferPool#unsafeReset()} method.
- */
- public static void resetBufferPool()
- {
- BufferPool.unsafeReset();
+ bufferPool = new BufferPool("test_pool", 8 * 1024 * 1024, true);
}
@Test
@@ -69,18 +48,20 @@ public class BufferPoolTest
{
final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assertEquals(true, buffer.isDirect());
+ assertEquals(size, bufferPool.usedSizeInBytes());
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
- assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+ assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
- BufferPool.put(buffer);
- assertEquals(null, BufferPool.unsafeCurrentChunk());
- assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+ bufferPool.put(buffer);
+ assertEquals(null, bufferPool.unsafeCurrentChunk());
+ assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
+ assertEquals(0, bufferPool.usedSizeInBytes());
}
@@ -98,7 +79,7 @@ public class BufferPoolTest
private void checkPageAligned(int size)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
assertTrue(buffer.isDirect());
@@ -106,7 +87,7 @@ public class BufferPoolTest
long address = MemoryUtil.getAddress(buffer);
assertTrue((address % MemoryUtil.pageSize()) == 0);
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -115,23 +96,23 @@ public class BufferPoolTest
final int size1 = 1024;
final int size2 = 2048;
- ByteBuffer buffer1 = BufferPool.get(size1, BufferType.OFF_HEAP);
+ ByteBuffer buffer1 = bufferPool.get(size1, BufferType.OFF_HEAP);
assertNotNull(buffer1);
assertEquals(size1, buffer1.capacity());
- ByteBuffer buffer2 = BufferPool.get(size2, BufferType.OFF_HEAP);
+ ByteBuffer buffer2 = bufferPool.get(size2, BufferType.OFF_HEAP);
assertNotNull(buffer2);
assertEquals(size2, buffer2.capacity());
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
- assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+ assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
- BufferPool.put(buffer1);
- BufferPool.put(buffer2);
+ bufferPool.put(buffer1);
+ bufferPool.put(buffer2);
- assertEquals(null, BufferPool.unsafeCurrentChunk());
- assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes());
+ assertEquals(null, bufferPool.unsafeCurrentChunk());
+ assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes());
}
@Test
@@ -149,14 +130,13 @@ public class BufferPoolTest
@Test
public void testMaxMemoryExceeded_SameAsChunkSize()
{
- BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
requestDoubleMaxMemory();
}
@Test
public void testMaxMemoryExceeded_SmallerThanChunkSize()
{
- BufferPool.setMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2);
+ bufferPool = new BufferPool("test_pool", BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2, false);
requestDoubleMaxMemory();
}
@@ -168,7 +148,7 @@ public class BufferPoolTest
private void requestDoubleMaxMemory()
{
- requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * BufferPool.getMemoryUsageThreshold()));
+ requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * bufferPool.memoryUsageThreshold()));
}
private void requestUpToSize(int bufferSize, int totalSize)
@@ -178,7 +158,7 @@ public class BufferPoolTest
List<ByteBuffer> buffers = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++)
{
- ByteBuffer buffer = BufferPool.get(bufferSize, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(bufferSize, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(bufferSize, buffer.capacity());
assertTrue(buffer.isDirect());
@@ -186,7 +166,7 @@ public class BufferPoolTest
}
for (ByteBuffer buffer : buffers)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -194,10 +174,10 @@ public class BufferPoolTest
{
final int size = BufferPool.NORMAL_CHUNK_SIZE + 1;
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -209,25 +189,25 @@ public class BufferPoolTest
List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers);
List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++)
- buffers1.add(BufferPool.get(size, BufferType.OFF_HEAP));
+ buffers1.add(bufferPool.get(size, BufferType.OFF_HEAP));
- BufferPool.Chunk chunk1 = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk1 = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk1);
for (int i = 0; i < numBuffers; i++)
- buffers2.add(BufferPool.get(size, BufferType.OFF_HEAP));
+ buffers2.add(bufferPool.get(size, BufferType.OFF_HEAP));
- assertEquals(2, BufferPool.unsafeNumChunks());
+ assertEquals(2, bufferPool.unsafeNumChunks());
for (ByteBuffer buffer : buffers1)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
- assertEquals(1, BufferPool.unsafeNumChunks());
+ assertEquals(1, bufferPool.unsafeNumChunks());
for (ByteBuffer buffer : buffers2)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
- assertEquals(0, BufferPool.unsafeNumChunks());
+ assertEquals(0, bufferPool.unsafeNumChunks());
buffers2.clear();
}
@@ -263,16 +243,16 @@ public class BufferPoolTest
{
doTestRandomFrees(12345567878L);
- BufferPool.unsafeReset();
+ bufferPool.unsafeReset();
doTestRandomFrees(20452249587L);
- BufferPool.unsafeReset();
+ bufferPool.unsafeReset();
doTestRandomFrees(82457252948L);
- BufferPool.unsafeReset();
+ bufferPool.unsafeReset();
doTestRandomFrees(98759284579L);
- BufferPool.unsafeReset();
+ bufferPool.unsafeReset();
doTestRandomFrees(19475257244L);
}
@@ -303,10 +283,10 @@ public class BufferPoolTest
List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots);
for (int i = 0; i < maxFreeSlots; i++)
{
- buffers.add(BufferPool.get(size, BufferType.OFF_HEAP));
+ buffers.add(bufferPool.get(size, BufferType.OFF_HEAP));
}
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertFalse(chunk.isFree());
int freeSize = BufferPool.NORMAL_CHUNK_SIZE - maxFreeSlots * size;
@@ -318,7 +298,7 @@ public class BufferPoolTest
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
freeSize += size;
if (freeSize == chunk.capacity())
@@ -341,18 +321,18 @@ public class BufferPoolTest
List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
for (int i = 0; i < sizes.length; i++)
{
- ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertTrue(buffer.capacity() >= sizes[i]);
buffers.add(buffer);
- sum += BufferPool.unsafeCurrentChunk().roundUp(buffer.capacity());
+ sum += bufferPool.unsafeCurrentChunk().roundUp(buffer.capacity());
}
// else the test will fail, adjust sizes as required
assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
Random rnd = new Random();
@@ -362,11 +342,11 @@ public class BufferPoolTest
int index = rnd.nextInt(buffers.size());
ByteBuffer buffer = buffers.remove(index);
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
- BufferPool.put(buffers.remove(0));
+ bufferPool.put(buffers.remove(0));
- assertEquals(null, BufferPool.unsafeCurrentChunk());
+ assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(0, chunk.free());
}
@@ -381,7 +361,7 @@ public class BufferPoolTest
List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
for (int i = 0; i < sizes.length; i++)
{
- ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertTrue(buffer.capacity() >= sizes[i]);
buffers.add(buffer);
@@ -392,15 +372,15 @@ public class BufferPoolTest
// else the test will fail, adjust sizes as required
assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
for (int i = 0; i < sizes.length; i++)
{
- BufferPool.put(buffers.get(i));
+ bufferPool.put(buffers.get(i));
}
- assertEquals(null, BufferPool.unsafeCurrentChunk());
+ assertEquals(null, bufferPool.unsafeCurrentChunk());
assertEquals(0, chunk.free());
}
@@ -415,22 +395,22 @@ public class BufferPoolTest
for (int i = 0; i < numBuffersInChunk; i++)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
buffers.add(buffer);
addresses.add(MemoryUtil.getAddress(buffer));
}
for (int i = numBuffersInChunk - 1; i >= 0; i--)
- BufferPool.put(buffers.get(i));
+ bufferPool.put(buffers.get(i));
buffers.clear();
for (int i = 0; i < numBuffersInChunk; i++)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(size, buffer.capacity());
- addresses.remove(MemoryUtil.getAddress(buffer));
+ assert addresses.remove(MemoryUtil.getAddress(buffer));
buffers.add(buffer);
}
@@ -438,18 +418,18 @@ public class BufferPoolTest
assertTrue(addresses.isEmpty()); // all 5 released buffers were used
for (ByteBuffer buffer : buffers)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
public void testHeapBuffer()
{
- ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP);
+ ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP);
assertNotNull(buffer);
assertEquals(1024, buffer.capacity());
assertFalse(buffer.isDirect());
assertNotNull(buffer.array());
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -497,17 +477,17 @@ public class BufferPoolTest
private void checkBuffer(int size)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE)
{
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size));
}
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -525,14 +505,14 @@ public class BufferPoolTest
for (int size : sizes)
{
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
buffers.add(buffer);
}
for (ByteBuffer buffer : buffers)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
@@ -544,36 +524,36 @@ public class BufferPoolTest
private void checkBufferWithGivenSlots(int size, long freeSlots)
{
//first allocate to make sure there is a chunk
- ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP);
// now get the current chunk and override the free slots mask
- BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
long oldFreeSlots = chunk.setFreeSlots(freeSlots);
// now check we can still get the buffer with the free slots mask changed
- ByteBuffer buffer2 = BufferPool.get(size, BufferType.OFF_HEAP);
+ ByteBuffer buffer2 = bufferPool.get(size, BufferType.OFF_HEAP);
assertEquals(size, buffer.capacity());
- BufferPool.put(buffer2);
+ bufferPool.put(buffer2);
// unsafeReset the free slots
chunk.setFreeSlots(oldFreeSlots);
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test
public void testZeroSizeRequest()
{
- ByteBuffer buffer = BufferPool.get(0, BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(0, BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(0, buffer.capacity());
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeSizeRequest()
{
- BufferPool.get(-1, BufferType.OFF_HEAP);
+ bufferPool.get(-1, BufferType.OFF_HEAP);
}
@Test
@@ -689,7 +669,7 @@ public class BufferPoolTest
for (int j = 0; j < threadSizes.length; j++)
{
- ByteBuffer buffer = BufferPool.get(threadSizes[j], BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(threadSizes[j], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(threadSizes[j], buffer.capacity());
@@ -704,17 +684,17 @@ public class BufferPoolTest
assertEquals(i, buffer.getInt());
if (returnImmediately)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
else
toBeReturned.add(buffer);
- assertTrue(BufferPool.sizeInBytes() > 0);
+ assertTrue(bufferPool.sizeInBytes() > 0);
}
Thread.sleep(rand.nextInt(3));
for (ByteBuffer buffer : toBeReturned)
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
}
catch (Exception ex)
{
@@ -758,13 +738,13 @@ public class BufferPoolTest
int sum = 0;
for (int i = 0; i < sizes.length; i++)
{
- buffers[i] = BufferPool.get(sizes[i], BufferType.OFF_HEAP);
+ buffers[i] = bufferPool.get(sizes[i], BufferType.OFF_HEAP);
assertNotNull(buffers[i]);
assertEquals(sizes[i], buffers[i].capacity());
- sum += BufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity());
+ sum += bufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity());
}
- final BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk();
+ final BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk();
assertNotNull(chunk);
assertFalse(chunk.isFree());
@@ -786,8 +766,8 @@ public class BufferPoolTest
{
try
{
- assertNotSame(chunk, BufferPool.unsafeCurrentChunk());
- BufferPool.put(buffer);
+ assertNotSame(chunk, bufferPool.unsafeCurrentChunk());
+ bufferPool.put(buffer);
}
catch (AssertionError ex)
{ //this is expected if we release a buffer more than once
@@ -816,12 +796,185 @@ public class BufferPoolTest
System.gc();
System.gc();
- assertTrue(BufferPool.unsafeCurrentChunk().isFree());
+ assertTrue(bufferPool.unsafeCurrentChunk().isFree());
//make sure the main thread can still allocate buffers
- ByteBuffer buffer = BufferPool.get(sizes[0], BufferType.OFF_HEAP);
+ ByteBuffer buffer = bufferPool.get(sizes[0], BufferType.OFF_HEAP);
assertNotNull(buffer);
assertEquals(sizes[0], buffer.capacity());
- BufferPool.put(buffer);
+ bufferPool.put(buffer);
+ }
+
+ @Test
+ public void testOverflowAllocation()
+ {
+ int macroChunkSize = BufferPool.GlobalPool.MACRO_CHUNK_SIZE;
+ int allocationSize = BufferPool.NORMAL_CHUNK_SIZE;
+ int allocations = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / allocationSize;
+
+ // occupy entire buffer pool
+ List<ByteBuffer> buffers = new ArrayList<>();
+ allocate(allocations, allocationSize, buffers);
+
+ assertEquals(macroChunkSize, bufferPool.sizeInBytes());
+ assertEquals(macroChunkSize, bufferPool.usedSizeInBytes());
+ assertEquals(0, bufferPool.overflowMemoryInBytes());
+
+ // allocate overflow due to pool exhaust
+ ByteBuffer overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.OFF_HEAP);
+
+ assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+ assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+ assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+
+ // free all buffer
+ bufferPool.put(overflowBuffer);
+ release(buffers);
+
+ assertEquals(macroChunkSize, bufferPool.sizeInBytes());
+ assertEquals(0, bufferPool.usedSizeInBytes());
+ assertEquals(0, bufferPool.overflowMemoryInBytes());
+
+ // allocate overflow due to on-heap
+ overflowBuffer = bufferPool.get(BufferPool.NORMAL_ALLOCATION_UNIT, BufferType.ON_HEAP);
+ assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+ assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+ assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+ bufferPool.put(overflowBuffer);
+
+ // allocate overflow due to over allocation size
+ overflowBuffer = bufferPool.get(2 * BufferPool.NORMAL_CHUNK_SIZE, BufferType.ON_HEAP);
+ assertEquals(macroChunkSize + overflowBuffer.capacity(), bufferPool.sizeInBytes());
+ assertEquals(overflowBuffer.capacity(), bufferPool.usedSizeInBytes());
+ assertEquals(overflowBuffer.capacity(), bufferPool.overflowMemoryInBytes());
+ }
+
+ @Test
+ public void testRecyclePartialFreeChunk()
+ {
+ // normal chunk size is 128kb
+ int halfNormalChunk = BufferPool.NORMAL_CHUNK_SIZE / 2; // 64kb, half of normal chunk
+ List<ByteBuffer> toRelease = new ArrayList<>();
+
+ // allocate three buffers on different chunks
+ ByteBuffer buffer0 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+ BufferPool.Chunk chunk0 = BufferPool.Chunk.getParentChunk(buffer0);
+ assertFalse(chunk0.isFree());
+ allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+ ByteBuffer buffer1 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+ BufferPool.Chunk chunk1 = BufferPool.Chunk.getParentChunk(buffer1);
+ assertFalse(chunk1.isFree());
+ assertNotEquals(chunk0, chunk1);
+ allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+ ByteBuffer buffer2 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+ BufferPool.Chunk chunk2 = BufferPool.Chunk.getParentChunk(buffer2);
+ assertFalse(chunk2.isFree());
+ assertNotEquals(chunk0, chunk2);
+ assertNotEquals(chunk1, chunk2);
+ allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk
+
+ // now all 3 chunks in local pool is full, allocate one more buffer to evict chunk2
+ ByteBuffer buffer3 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP);
+ BufferPool.Chunk chunk3 = BufferPool.Chunk.getParentChunk(buffer3);
+ assertNotEquals(chunk0, chunk3);
+ assertNotEquals(chunk1, chunk3);
+ assertNotEquals(chunk2, chunk3);
+
+ // verify chunk2 got evicted, it doesn't have a owner
+ assertNotNull(chunk0.owner());
+ assertEquals(BufferPool.Chunk.Status.IN_USE, chunk0.status());
+ assertNotNull(chunk1.owner());
+ assertEquals(BufferPool.Chunk.Status.IN_USE, chunk1.status());
+ assertNull(chunk2.owner());
+ assertEquals(BufferPool.Chunk.Status.EVICTED, chunk2.status());
+
+ // release half buffers for chunk0/1/2
+ release(toRelease);
+ BufferPool.Chunk partiallyFreed = chunk2;
+
+ // try to recirculate chunk2 and verify freed space
+ assertFalse(bufferPool.globalPool().isFullyFreed(partiallyFreed));
+ assertTrue(bufferPool.globalPool().isPartiallyFreed(partiallyFreed));
+ assertEquals(BufferPool.Chunk.Status.IN_USE, partiallyFreed.status());
+ assertEquals(halfNormalChunk, partiallyFreed.free());
+ ByteBuffer buffer = partiallyFreed.get(halfNormalChunk, false, null);
+ assertEquals(halfNormalChunk, buffer.capacity());
+
+ // cleanup allocated buffers
+ for (ByteBuffer buf : Arrays.asList(buffer0, buffer1, buffer2, buffer3, buffer))
+ bufferPool.put(buf);
+
+ // verify that fully freed chunk are prioritized over partially freed chunks
+ List<BufferPool.Chunk> remainingChunks = new ArrayList<>();
+ BufferPool.Chunk chunkForAllocation;
+ while ((chunkForAllocation = bufferPool.globalPool().get()) != null)
+ remainingChunks.add(chunkForAllocation);
+
+ int totalNormalChunks = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / BufferPool.NORMAL_CHUNK_SIZE; // 64;
+ assertEquals(totalNormalChunks, remainingChunks.size());
+ assertSame(partiallyFreed, remainingChunks.get(remainingChunks.size() - 1)); // last one is partially freed
+
+ // cleanup polled chunks
+ remainingChunks.forEach(BufferPool.Chunk::release);
+ }
+
+ @Test
+ public void testTinyPool()
+ {
+ int total = 0;
+ final int size = BufferPool.TINY_ALLOCATION_UNIT;
+ final int allocationPerChunk = 64;
+
+ // occupy 3 tiny chunks
+ List<ByteBuffer> buffers0 = new ArrayList<>();
+ BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0);
+ assertTrue(chunk0.owner().isTinyPool());
+ List<ByteBuffer> buffers1 = new ArrayList<>();
+ BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1);
+ assertTrue(chunk1.owner().isTinyPool());
+ List<ByteBuffer> buffers2 = new ArrayList<>();
+ BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2);
+ assertTrue(chunk2.owner().isTinyPool());
+ total += 3 * BufferPool.TINY_CHUNK_SIZE;
+ assertEquals(total, bufferPool.usedSizeInBytes());
+
+ // allocate another tiny chunk.. chunk2 should be evicted
+ List<ByteBuffer> buffers3 = new ArrayList<>();
+ BufferPool.Chunk chunk3 = allocate(allocationPerChunk, size, buffers3);
+ assertTrue(chunk3.owner().isTinyPool());
+ total += BufferPool.TINY_CHUNK_SIZE;
+ assertEquals(total, bufferPool.usedSizeInBytes());
+
+ // verify chunk2 is full and evicted
+ assertEquals(0, chunk2.free());
+ assertNull(chunk2.owner());
+
+ // release chunk2's buffer
+ for (int i = 0; i < buffers2.size(); i++)
+ {
+ bufferPool.put(buffers2.get(i));
+ total -= buffers2.get(i).capacity();
+ assertEquals(total, bufferPool.usedSizeInBytes());
+ }
+
+ // cleanup allocated buffers
+ for (ByteBuffer buffer : Iterables.concat(buffers0, buffers1, buffers3))
+ bufferPool.put(buffer);
+ }
+
+ private BufferPool.Chunk allocate(int num, int bufferSize, List<ByteBuffer> buffers)
+ {
+ for (int i = 0; i < num; i++)
+ buffers.add(bufferPool.get(bufferSize, BufferType.OFF_HEAP));
+
+ return BufferPool.Chunk.getParentChunk(buffers.get(buffers.size() - 1));
+ }
+
+ private void release(List<ByteBuffer> toRelease)
+ {
+ for (ByteBuffer buffer : toRelease)
+ bufferPool.put(buffer);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org