You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/06/06 02:45:21 UTC

kafka git commit: KAFKA-5239; Allocate memory outside the lock in producer buffer pool

Repository: kafka
Updated Branches:
  refs/heads/trunk 9934d28a3 -> 4b37918ef


KAFKA-5239; Allocate memory outside the lock in producer buffer pool

Move byte buffer allocation out of lock.
Add unit test for restoring count when OOM is thrown from byte buffer allocation.

Author: Sean McCauliff <sm...@linkedin.com>

Reviewers: Jiangjie Qin <be...@gmail.com>

Closes #3053 from smccauliff/kafka-5239


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b37918e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b37918e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b37918e

Branch: refs/heads/trunk
Commit: 4b37918ef47f5d8c4874644f576d8e1a72e16212
Parents: 9934d28
Author: Sean McCauliff <sm...@linkedin.com>
Authored: Mon Jun 5 19:45:14 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Mon Jun 5 19:45:14 2017 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/BufferPool.java  | 73 +++++++++++++-------
 .../producer/internals/BufferPoolTest.java      | 25 ++++++-
 2 files changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b37918e/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index f53ce7b..019201f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -50,8 +50,8 @@ public class BufferPool {
     private final ReentrantLock lock;
     private final Deque<ByteBuffer> free;
     private final Deque<Condition> waiters;
-    /** This memory is accounted for separately from the poolable buffers in free. */
-    private long availableMemory;
+    /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
+    private long nonPooledAvailableMemory;
     private final Metrics metrics;
     private final Time time;
     private final Sensor waitTime;
@@ -71,7 +71,7 @@ public class BufferPool {
         this.free = new ArrayDeque<>();
         this.waiters = new ArrayDeque<>();
         this.totalMemory = memory;
-        this.availableMemory = memory;
+        this.nonPooledAvailableMemory = memory;
         this.metrics = metrics;
         this.time = time;
         this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
@@ -99,6 +99,7 @@ public class BufferPool {
                                                + this.totalMemory
                                                + " on memory allocations.");
 
+        ByteBuffer buffer = null;
         this.lock.lock();
         try {
             // check if we have a free buffer of the right size pooled
@@ -108,18 +109,14 @@ public class BufferPool {
             // now check if the request is immediately satisfiable with the
             // memory on hand or if we need to block
             int freeListSize = freeSize() * this.poolableSize;
-            if (this.availableMemory + freeListSize >= size) {
+            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                 // we have enough unallocated or pooled memory to immediately
-                // satisfy the request
+                // satisfy the request, but need to allocate the buffer
                 freeUp(size);
-                ByteBuffer allocatedBuffer = allocateByteBuffer(size);
-                this.availableMemory -= size;
-                return allocatedBuffer;
+                this.nonPooledAvailableMemory -= size;
             } else {
                 // we are out of memory and will have to block
                 int accumulated = 0;
-                ByteBuffer buffer = null;
-                boolean hasError = true;
                 Condition moreMemory = this.lock.newCondition();
                 try {
                     long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
@@ -154,21 +151,16 @@ public class BufferPool {
                             // we'll need to allocate memory, but we may only get
                             // part of what we need on this iteration
                             freeUp(size - accumulated);
-                            int got = (int) Math.min(size - accumulated, this.availableMemory);
-                            this.availableMemory -= got;
+                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
+                            this.nonPooledAvailableMemory -= got;
                             accumulated += got;
                         }
                     }
-
-                    if (buffer == null)
-                        buffer = allocateByteBuffer(size);
-                    hasError = false;
-                    //unlock happens in top-level, enclosing finally
-                    return buffer;
+                    // Don't reclaim memory on throwable since nothing was thrown
+                    accumulated = 0;
                 } finally {
                     // When this loop was not able to successfully terminate don't loose available memory
-                    if (hasError)
-                        this.availableMemory += accumulated;
+                    this.nonPooledAvailableMemory += accumulated;
                     this.waiters.remove(moreMemory);
                 }
             }
@@ -176,13 +168,42 @@ public class BufferPool {
             // signal any additional waiters if there is more memory left
             // over for them
             try {
-                if (!(this.availableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
+                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                     this.waiters.peekFirst().signal();
             } finally {
                 // Another finally... otherwise find bugs complains
                 lock.unlock();
             }
         }
+
+        if (buffer == null)
+            return safeAllocateByteBuffer(size);
+        else
+            return buffer;
+    }
+
+    /**
+     * Allocate a buffer.  If buffer allocation fails (e.g. because of OOM) then return the size count back to
+     * available memory and signal the next waiter if it exists.
+     */
+    private ByteBuffer safeAllocateByteBuffer(int size) {
+        boolean error = true;
+        try {
+            ByteBuffer buffer = allocateByteBuffer(size);
+            error = false;
+            return buffer;
+        } finally {
+            if (error) {
+                this.lock.lock();
+                try {
+                    this.nonPooledAvailableMemory += size;
+                    if (!this.waiters.isEmpty())
+                        this.waiters.peekFirst().signal();
+                } finally {
+                    this.lock.unlock();
+                }
+            }
+        }
     }
 
     // Protected for testing.
@@ -195,8 +216,8 @@ public class BufferPool {
      * buffers (if needed)
      */
     private void freeUp(int size) {
-        while (!this.free.isEmpty() && this.availableMemory < size)
-            this.availableMemory += this.free.pollLast().capacity();
+        while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
+            this.nonPooledAvailableMemory += this.free.pollLast().capacity();
     }
 
     /**
@@ -214,7 +235,7 @@ public class BufferPool {
                 buffer.clear();
                 this.free.add(buffer);
             } else {
-                this.availableMemory += size;
+                this.nonPooledAvailableMemory += size;
             }
             Condition moreMem = this.waiters.peekFirst();
             if (moreMem != null)
@@ -234,7 +255,7 @@ public class BufferPool {
     public long availableMemory() {
         lock.lock();
         try {
-            return this.availableMemory + freeSize() * (long) this.poolableSize;
+            return this.nonPooledAvailableMemory + freeSize() * (long) this.poolableSize;
         } finally {
             lock.unlock();
         }
@@ -251,7 +272,7 @@ public class BufferPool {
     public long unallocatedMemory() {
         lock.lock();
         try {
-            return this.availableMemory;
+            return this.nonPooledAvailableMemory;
         } finally {
             lock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b37918e/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 2ce33ee..0a30490 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -48,11 +48,10 @@ import static org.easymock.EasyMock.anyDouble;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.anyString;
-
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
 
 
 @RunWith(PowerMockRunner.class)
@@ -345,10 +344,30 @@ public class BufferPoolTest {
         assertEquals(20_000_000_000L, pool.availableMemory());
     }
 
+    @Test
+    public void outOfMemoryOnAllocation() {
+        BufferPool bufferPool = new BufferPool(1024, 1024, metrics, time, metricGroup) {
+            @Override
+            protected ByteBuffer allocateByteBuffer(int size) {
+                throw new OutOfMemoryError();
+            }
+        };
+
+        try {
+            bufferPool.allocateByteBuffer(1024);
+            // should not reach here
+            fail("Should have thrown OutOfMemoryError");
+        } catch (OutOfMemoryError ignored) {
+
+        }
+
+        assertEquals(bufferPool.availableMemory(), 1024);
+    }
+
     public static class StressTestThread extends Thread {
         private final int iterations;
         private final BufferPool pool;
-        private final long maxBlockTimeMs =  2000;
+        private final long maxBlockTimeMs =  20_000;
         public final AtomicBoolean success = new AtomicBoolean(false);
 
         public StressTestThread(BufferPool pool, int iterations) {