You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/09/13 15:12:02 UTC
[kafka] branch 3.3 updated: KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)
This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 9633c01d2ff KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)
9633c01d2ff is described below
commit 9633c01d2ffdb8f9408bbf730bf139fd33831f8b
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Sep 13 08:04:40 2022 -0700
KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)
Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is
possible for the memory pool to run out of memory if the snapshot is
greater than 5 * 8MB.
This change allows the BatchMemoryPool to always allocate a buffer when
requested. The memory pool frees the extra allocated buffer when released if
the number of pooled buffers is greater than the configured maximum
batches.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/raft/internals/BatchMemoryPool.java | 59 +++++++++---
.../kafka/raft/internals/BatchMemoryPoolTest.java | 104 ++++++++++++++-------
2 files changed, 112 insertions(+), 51 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
index 5cd3e3316cb..ae6cba81de6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
@@ -24,23 +24,44 @@ import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
/**
- * Simple memory pool which maintains a limited number of fixed-size buffers.
+ * Simple memory pool that tries to maintain a limited number of fixed-size buffers.
+ *
+ * This type implements an unbounded memory pool. When releasing byte buffers they will get pooled
+ * up to the maximum retained number of batches.
*/
public class BatchMemoryPool implements MemoryPool {
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
- private final int maxBatches;
+ private final int maxRetainedBatches;
private final int batchSize;
private int numAllocatedBatches = 0;
- public BatchMemoryPool(int maxBatches, int batchSize) {
- this.maxBatches = maxBatches;
+ /**
+ * Construct a memory pool.
+ *
+ * The byte buffers are always of batchSize size. The memory pool is unbounded but it will retain
+ * up to maxRetainedBatches byte buffers for reuse.
+ *
+ * @param maxRetainedBatches maximum number of byte buffers to pool for reuse
+ * @param batchSize the size of each byte buffer
+ */
+ public BatchMemoryPool(int maxRetainedBatches, int batchSize) {
+ this.maxRetainedBatches = maxRetainedBatches;
this.batchSize = batchSize;
- this.free = new ArrayDeque<>(maxBatches);
+ this.free = new ArrayDeque<>(maxRetainedBatches);
this.lock = new ReentrantLock();
}
+ /**
+ * Allocate a byte buffer in this pool.
+ *
+ * This method should always succeed and never return null. The sizeBytes parameter must be less than
+ * the batchSize used in the constructor.
+ *
+ * @param sizeBytes is not used to determine the size of the byte buffer
+ * @throws IllegalArgumentException if sizeBytes is greater than batchSize
+ */
@Override
public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes > batchSize) {
@@ -51,16 +72,24 @@ public class BatchMemoryPool implements MemoryPool {
lock.lock();
try {
ByteBuffer buffer = free.poll();
- if (buffer == null && numAllocatedBatches < maxBatches) {
+ // Always allocation a new buffer if there are no free buffers
+ if (buffer == null) {
buffer = ByteBuffer.allocate(batchSize);
numAllocatedBatches += 1;
}
+
return buffer;
} finally {
lock.unlock();
}
}
+ /**
+ * Release a previously allocated byte buffer.
+ *
+ * The byte buffer is pooled if the number of pooled byte buffer is less than the maxRetainedBatches in
+ * the constructor. Otherwise, the byte buffer is return to the JVM for garbage collection.
+ */
@Override
public void release(ByteBuffer previouslyAllocated) {
lock.lock();
@@ -72,7 +101,13 @@ public class BatchMemoryPool implements MemoryPool {
+ previouslyAllocated.limit());
}
- free.offer(previouslyAllocated);
+ // Free the buffer if the number of pooled buffers is already the maximum number of batches.
+ // Otherwise return the buffer to the memory pool.
+ if (free.size() >= maxRetainedBatches) {
+ numAllocatedBatches--;
+ } else {
+ free.offer(previouslyAllocated);
+ }
} finally {
lock.unlock();
}
@@ -90,18 +125,12 @@ public class BatchMemoryPool implements MemoryPool {
@Override
public long availableMemory() {
- lock.lock();
- try {
- int freeBatches = free.size() + (maxBatches - numAllocatedBatches);
- return freeBatches * (long) batchSize;
- } finally {
- lock.unlock();
- }
+ return Long.MAX_VALUE;
}
@Override
public boolean isOutOfMemory() {
- return availableMemory() == 0;
+ return false;
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
index 4177de145fe..5573f2a0f83 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
@@ -16,15 +16,15 @@
*/
package org.apache.kafka.raft.internals;
-import org.junit.jupiter.api.Test;
-
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -33,37 +33,44 @@ class BatchMemoryPoolTest {
@Test
public void testAllocateAndRelease() {
int batchSize = 1024;
- int maxBatches = 1;
+ int maxRetainedBatches = 1;
+ Set<ByteBuffer> released = Collections.newSetFromMap(new IdentityHashMap<>());
- BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
- assertEquals(batchSize, pool.availableMemory());
+ BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
+ assertEquals(Long.MAX_VALUE, pool.availableMemory());
assertFalse(pool.isOutOfMemory());
- ByteBuffer allocated = pool.tryAllocate(batchSize);
- assertNotNull(allocated);
- assertEquals(0, allocated.position());
- assertEquals(batchSize, allocated.limit());
- assertEquals(0, pool.availableMemory());
- assertTrue(pool.isOutOfMemory());
- assertNull(pool.tryAllocate(batchSize));
+ ByteBuffer buffer1 = pool.tryAllocate(batchSize);
+ assertNotNull(buffer1);
+ assertEquals(0, buffer1.position());
+ assertEquals(batchSize, buffer1.limit());
+ assertEquals(Long.MAX_VALUE, pool.availableMemory());
+ assertFalse(pool.isOutOfMemory());
+
+ // Test that allocation works even after maximum batches are allocated
+ ByteBuffer buffer2 = pool.tryAllocate(batchSize);
+ assertNotNull(buffer2);
+ // The size of the pool can exceed maxRetainedBatches * batchSize
+ assertEquals(2 * batchSize, pool.size());
+ release(update(buffer2), pool, released);
- allocated.position(512);
- allocated.limit(724);
+ release(update(buffer1), pool, released);
+ assertEquals(maxRetainedBatches * batchSize, pool.size());
- pool.release(allocated);
ByteBuffer reallocated = pool.tryAllocate(batchSize);
- assertSame(allocated, reallocated);
- assertEquals(0, allocated.position());
- assertEquals(batchSize, allocated.limit());
+ assertTrue(released.contains(reallocated));
+ assertEquals(0, reallocated.position());
+ assertEquals(batchSize, reallocated.limit());
}
@Test
public void testMultipleAllocations() {
int batchSize = 1024;
- int maxBatches = 3;
+ int maxRetainedBatches = 3;
+ Set<ByteBuffer> released = Collections.newSetFromMap(new IdentityHashMap<>());
- BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
- assertEquals(batchSize * maxBatches, pool.availableMemory());
+ BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
+ assertEquals(Long.MAX_VALUE, pool.availableMemory());
ByteBuffer batch1 = pool.tryAllocate(batchSize);
assertNotNull(batch1);
@@ -74,34 +81,59 @@ class BatchMemoryPoolTest {
ByteBuffer batch3 = pool.tryAllocate(batchSize);
assertNotNull(batch3);
- assertNull(pool.tryAllocate(batchSize));
-
- pool.release(batch2);
- assertSame(batch2, pool.tryAllocate(batchSize));
-
- pool.release(batch1);
- pool.release(batch3);
- ByteBuffer buffer = pool.tryAllocate(batchSize);
- assertTrue(buffer == batch1 || buffer == batch3);
+ // Test that allocation works even after maximum batches are allocated
+ ByteBuffer batch4 = pool.tryAllocate(batchSize);
+ assertNotNull(batch4);
+ // The size of the pool can exceed maxRetainedBatches * batchSize
+ assertEquals(4 * batchSize, pool.size());
+ release(batch4, pool, released);
+
+ release(batch2, pool, released);
+ ByteBuffer batch5 = pool.tryAllocate(batchSize);
+ assertTrue(released.contains(batch5));
+ released.remove(batch5);
+
+ release(batch1, pool, released);
+ release(batch3, pool, released);
+
+ ByteBuffer batch6 = pool.tryAllocate(batchSize);
+ assertTrue(released.contains(batch6));
+ released.remove(batch6);
+
+ // Release all previously allocated buffers
+ release(batch5, pool, released);
+ release(batch6, pool, released);
+ assertEquals(maxRetainedBatches * batchSize, pool.size());
}
@Test
public void testOversizeAllocation() {
int batchSize = 1024;
- int maxBatches = 3;
+ int maxRetainedBatches = 3;
- BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
+ BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
assertThrows(IllegalArgumentException.class, () -> pool.tryAllocate(batchSize + 1));
}
@Test
public void testReleaseBufferNotMatchingBatchSize() {
int batchSize = 1024;
- int maxBatches = 3;
+ int maxRetainedBatches = 3;
- BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
+ BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
ByteBuffer buffer = ByteBuffer.allocate(1023);
assertThrows(IllegalArgumentException.class, () -> pool.release(buffer));
}
+ private ByteBuffer update(ByteBuffer buffer) {
+ buffer.position(512);
+ buffer.limit(724);
+
+ return buffer;
+ }
+
+ private void release(ByteBuffer buffer, BatchMemoryPool pool, Set<ByteBuffer> released) {
+ pool.release(buffer);
+ released.add(buffer);
+ }
}