You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/09/13 15:12:02 UTC

[kafka] branch 3.3 updated: KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 9633c01d2ff KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)
9633c01d2ff is described below

commit 9633c01d2ffdb8f9408bbf730bf139fd33831f8b
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Sep 13 08:04:40 2022 -0700

    KAFKA-14222; KRaft's memory pool should always allocate a buffer (#12625)
    
    Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is
    possible for the memory pool to run out of memory if the snapshot is
    greater than 5 * 8MB.
    
    This change allows the BatchMemoryPool to always allocate a buffer when
    requested. The memory pool frees the extra allocated buffer when released if
    the number of pooled buffers is greater than the configured maximum
    batches.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/raft/internals/BatchMemoryPool.java      |  59 +++++++++---
 .../kafka/raft/internals/BatchMemoryPoolTest.java  | 104 ++++++++++++++-------
 2 files changed, 112 insertions(+), 51 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
index 5cd3e3316cb..ae6cba81de6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
@@ -24,23 +24,44 @@ import java.util.Deque;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Simple memory pool which maintains a limited number of fixed-size buffers.
+ * Simple memory pool that tries to maintain a limited number of fixed-size buffers.
+ *
+ * This type implements an unbounded memory pool. When releasing byte buffers they will get pooled
+ * up to the maximum retained number of batches.
  */
 public class BatchMemoryPool implements MemoryPool {
     private final ReentrantLock lock;
     private final Deque<ByteBuffer> free;
-    private final int maxBatches;
+    private final int maxRetainedBatches;
     private final int batchSize;
 
     private int numAllocatedBatches = 0;
 
-    public BatchMemoryPool(int maxBatches, int batchSize) {
-        this.maxBatches = maxBatches;
+    /**
+     * Construct a memory pool.
+     *
+     * The byte buffers are always of batchSize size. The memory pool is unbounded but it will retain
+     * up to maxRetainedBatches byte buffers for reuse.
+     *
+     * @param maxRetainedBatches maximum number of byte buffers to pool for reuse
+     * @param batchSize the size of each byte buffer
+     */
+    public BatchMemoryPool(int maxRetainedBatches, int batchSize) {
+        this.maxRetainedBatches = maxRetainedBatches;
         this.batchSize = batchSize;
-        this.free = new ArrayDeque<>(maxBatches);
+        this.free = new ArrayDeque<>(maxRetainedBatches);
         this.lock = new ReentrantLock();
     }
 
+    /**
+     * Allocate a byte buffer in this pool.
+     *
+     * This method should always succeed and never return null. The sizeBytes parameter must be less than
+     * the batchSize used in the constructor.
+     *
+     * @param sizeBytes is not used to determine the size of the byte buffer
+     * @throws IllegalArgumentException if sizeBytes is greater than batchSize
+     */
     @Override
     public ByteBuffer tryAllocate(int sizeBytes) {
         if (sizeBytes > batchSize) {
@@ -51,16 +72,24 @@ public class BatchMemoryPool implements MemoryPool {
         lock.lock();
         try {
             ByteBuffer buffer = free.poll();
-            if (buffer == null && numAllocatedBatches < maxBatches) {
+            // Always allocation a new buffer if there are no free buffers
+            if (buffer == null) {
                 buffer = ByteBuffer.allocate(batchSize);
                 numAllocatedBatches += 1;
             }
+
             return buffer;
         } finally {
             lock.unlock();
         }
     }
 
+    /**
+     * Release a previously allocated byte buffer.
+     *
+     * The byte buffer is pooled if the number of pooled byte buffer is less than the maxRetainedBatches in
+     * the constructor. Otherwise, the byte buffer is return to the JVM for garbage collection.
+     */
     @Override
     public void release(ByteBuffer previouslyAllocated) {
         lock.lock();
@@ -72,7 +101,13 @@ public class BatchMemoryPool implements MemoryPool {
                     + previouslyAllocated.limit());
             }
 
-            free.offer(previouslyAllocated);
+            // Free the buffer if the number of pooled buffers is already the maximum number of batches.
+            // Otherwise return the buffer to the memory pool.
+            if (free.size() >= maxRetainedBatches) {
+                numAllocatedBatches--;
+            } else {
+                free.offer(previouslyAllocated);
+            }
         } finally {
             lock.unlock();
         }
@@ -90,18 +125,12 @@ public class BatchMemoryPool implements MemoryPool {
 
     @Override
     public long availableMemory() {
-        lock.lock();
-        try {
-            int freeBatches = free.size() + (maxBatches - numAllocatedBatches);
-            return freeBatches * (long) batchSize;
-        } finally {
-            lock.unlock();
-        }
+        return Long.MAX_VALUE;
     }
 
     @Override
     public boolean isOutOfMemory() {
-        return availableMemory() == 0;
+        return false;
     }
 
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
index 4177de145fe..5573f2a0f83 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java
@@ -16,15 +16,15 @@
  */
 package org.apache.kafka.raft.internals;
 
-import org.junit.jupiter.api.Test;
-
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -33,37 +33,44 @@ class BatchMemoryPoolTest {
     @Test
     public void testAllocateAndRelease() {
         int batchSize = 1024;
-        int maxBatches = 1;
+        int maxRetainedBatches = 1;
+        Set<ByteBuffer> released = Collections.newSetFromMap(new IdentityHashMap<>());
 
-        BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
-        assertEquals(batchSize, pool.availableMemory());
+        BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
+        assertEquals(Long.MAX_VALUE, pool.availableMemory());
         assertFalse(pool.isOutOfMemory());
 
-        ByteBuffer allocated = pool.tryAllocate(batchSize);
-        assertNotNull(allocated);
-        assertEquals(0, allocated.position());
-        assertEquals(batchSize, allocated.limit());
-        assertEquals(0, pool.availableMemory());
-        assertTrue(pool.isOutOfMemory());
-        assertNull(pool.tryAllocate(batchSize));
+        ByteBuffer buffer1 = pool.tryAllocate(batchSize);
+        assertNotNull(buffer1);
+        assertEquals(0, buffer1.position());
+        assertEquals(batchSize, buffer1.limit());
+        assertEquals(Long.MAX_VALUE, pool.availableMemory());
+        assertFalse(pool.isOutOfMemory());
+
+        // Test that allocation works even after maximum batches are allocated 
+        ByteBuffer buffer2 = pool.tryAllocate(batchSize);
+        assertNotNull(buffer2);
+        // The size of the pool can exceed maxRetainedBatches * batchSize
+        assertEquals(2 * batchSize, pool.size());
+        release(update(buffer2), pool, released);
 
-        allocated.position(512);
-        allocated.limit(724);
+        release(update(buffer1), pool, released);
+        assertEquals(maxRetainedBatches * batchSize, pool.size());
 
-        pool.release(allocated);
         ByteBuffer reallocated = pool.tryAllocate(batchSize);
-        assertSame(allocated, reallocated);
-        assertEquals(0, allocated.position());
-        assertEquals(batchSize, allocated.limit());
+        assertTrue(released.contains(reallocated));
+        assertEquals(0, reallocated.position());
+        assertEquals(batchSize, reallocated.limit());
     }
 
     @Test
     public void testMultipleAllocations() {
         int batchSize = 1024;
-        int maxBatches = 3;
+        int maxRetainedBatches = 3;
+        Set<ByteBuffer> released = Collections.newSetFromMap(new IdentityHashMap<>());
 
-        BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
-        assertEquals(batchSize * maxBatches, pool.availableMemory());
+        BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
+        assertEquals(Long.MAX_VALUE, pool.availableMemory());
 
         ByteBuffer batch1 = pool.tryAllocate(batchSize);
         assertNotNull(batch1);
@@ -74,34 +81,59 @@ class BatchMemoryPoolTest {
         ByteBuffer batch3 = pool.tryAllocate(batchSize);
         assertNotNull(batch3);
 
-        assertNull(pool.tryAllocate(batchSize));
-
-        pool.release(batch2);
-        assertSame(batch2, pool.tryAllocate(batchSize));
-
-        pool.release(batch1);
-        pool.release(batch3);
-        ByteBuffer buffer = pool.tryAllocate(batchSize);
-        assertTrue(buffer == batch1 || buffer == batch3);
+        // Test that allocation works even after maximum batches are allocated 
+        ByteBuffer batch4 = pool.tryAllocate(batchSize);
+        assertNotNull(batch4);
+        // The size of the pool can exceed maxRetainedBatches * batchSize
+        assertEquals(4 * batchSize, pool.size());
+        release(batch4, pool, released);
+
+        release(batch2, pool, released);
+        ByteBuffer batch5 = pool.tryAllocate(batchSize);
+        assertTrue(released.contains(batch5));
+        released.remove(batch5);
+
+        release(batch1, pool, released);
+        release(batch3, pool, released);
+
+        ByteBuffer batch6 = pool.tryAllocate(batchSize);
+        assertTrue(released.contains(batch6));
+        released.remove(batch6);
+
+        // Release all previously allocated buffers
+        release(batch5, pool, released);
+        release(batch6, pool, released);
+        assertEquals(maxRetainedBatches * batchSize, pool.size());
     }
 
     @Test
     public void testOversizeAllocation() {
         int batchSize = 1024;
-        int maxBatches = 3;
+        int maxRetainedBatches = 3;
 
-        BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
+        BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
         assertThrows(IllegalArgumentException.class, () -> pool.tryAllocate(batchSize + 1));
     }
 
     @Test
     public void testReleaseBufferNotMatchingBatchSize() {
         int batchSize = 1024;
-        int maxBatches = 3;
+        int maxRetainedBatches = 3;
 
-        BatchMemoryPool pool = new BatchMemoryPool(maxBatches, batchSize);
+        BatchMemoryPool pool = new BatchMemoryPool(maxRetainedBatches, batchSize);
         ByteBuffer buffer = ByteBuffer.allocate(1023);
         assertThrows(IllegalArgumentException.class, () -> pool.release(buffer));
     }
 
+    private ByteBuffer update(ByteBuffer buffer) {
+        buffer.position(512);
+        buffer.limit(724);
+
+        return buffer;
+    }
+
+    private void release(ByteBuffer buffer, BatchMemoryPool pool, Set<ByteBuffer> released) {
+        pool.release(buffer);
+        released.add(buffer);
+    }
 }