You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/05 22:49:55 UTC

kafka git commit: KAFKA-3651; Remove the condition variable waiting on memory availability in Bufferpool when a TimeoutException is thrown

Repository: kafka
Updated Branches:
  refs/heads/trunk 8429db937 -> 6856c5c21


KAFKA-3651; Remove the condition variable waiting on memory availability in Bufferpool when a TimeoutException is thrown

Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the waiters deque

Author: MayureshGharat <gh...@gmail.com>

Reviewers: Chen Zhu <am...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1314 from MayureshGharat/kafka-3651


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6856c5c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6856c5c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6856c5c2

Branch: refs/heads/trunk
Commit: 6856c5c214fb0a40b18cfb25db3dadae320c4142
Parents: 8429db9
Author: Mayuresh Gharat <gh...@gmail.com>
Authored: Thu May 5 23:31:46 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 5 23:31:46 2016 +0100

----------------------------------------------------------------------
 .../clients/producer/internals/BufferPool.java  | 26 +++++--
 .../producer/internals/BufferPoolTest.java      | 77 +++++++++++++++++++-
 2 files changed, 95 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6856c5c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 5577971..b42b0ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -123,16 +123,25 @@ public final class BufferPool {
                 // enough memory to allocate one
                 while (accumulated < size) {
                     long startWaitNs = time.nanoseconds();
-                    boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
-                    long endWaitNs = time.nanoseconds();
-                    long timeNs = Math.max(0L, endWaitNs - startWaitNs);
-                    this.waitTime.record(timeNs, time.milliseconds());
+                    long timeNs;
+                    boolean waitingTimeElapsed;
+                    try {
+                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
+                    } catch (InterruptedException e) {
+                        this.waiters.remove(moreMemory);
+                        throw e;
+                    } finally {
+                        long endWaitNs = time.nanoseconds();
+                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
+                        this.waitTime.record(timeNs, time.milliseconds());
+                    }
 
-                    if (waitingTimeElapsed)
+                    if (waitingTimeElapsed) {
+                        this.waiters.remove(moreMemory);
                         throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
+                    }
 
                     remainingTimeToBlockNs -= timeNs;
-
                     // check if we can satisfy this request from the free list,
                     // otherwise allocate memory
                     if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
@@ -262,4 +271,9 @@ public final class BufferPool {
     public long totalMemory() {
         return this.totalMemory;
     }
+
+    // package-private method used only for testing
+    Deque<Condition> waiters() {
+        return this.waiters;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6856c5c2/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 88e8943..48682b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -26,11 +26,14 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
 
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
@@ -148,8 +151,6 @@ public class BufferPoolTest {
     /**
      * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
      * And verify that the allocation should finish soon after the maxBlockTimeMs.
-     *
-     * @throws Exception
      */
     @Test
     public void testBlockTimeout() throws Exception {
@@ -175,6 +176,78 @@ public class BufferPoolTest {
     }
 
     /**
+     * Test if the  waiter that is waiting on availability of more memory is cleaned up when a timeout occurs
+     */
+    @Test
+    public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
+        pool.allocate(1, maxBlockTimeMs);
+        try {
+            pool.allocate(2, maxBlockTimeMs);
+            fail("The buffer allocated more memory than its maximum value 2");
+        } catch (TimeoutException e) {
+            // this is good
+        }
+        assertTrue(pool.queued() == 0);
+    }
+
+    /**
+     * Test if the  waiter that is waiting on availability of more memory is cleaned up when an interruption occurs
+     */
+    @Test
+    public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
+        long blockTime = 5000;
+        pool.allocate(1, maxBlockTimeMs);
+        Thread t1 = new Thread(new BufferPoolAllocator(pool, blockTime));
+        Thread t2 = new Thread(new BufferPoolAllocator(pool, blockTime));
+        // start thread t1 which will try to allocate more memory on to the Buffer pool
+        t1.start();
+        // sleep for 500ms. Condition variable c1 associated with pool.allocate() by thread t1 will be inserted in the waiters queue.
+        Thread.sleep(500);
+        Deque<Condition> waiters = pool.waiters();
+        // get the condition object associated with pool.allocate() by thread t1
+        Condition c1 = waiters.getFirst();
+        // start thread t2 which will try to allocate more memory on to the Buffer pool
+        t2.start();
+        // sleep for 500ms. Condition variable c2 associated with pool.allocate() by thread t2 will be inserted in the waiters queue. The waiters queue will have 2 entries c1 and c2.
+        Thread.sleep(500);
+        t1.interrupt();
+        // sleep for 500ms.
+        Thread.sleep(500);
+        // get the condition object associated with allocate() by thread t2
+        Condition c2 = waiters.getLast();
+        t2.interrupt();
+        assertNotEquals(c1, c2);
+        t1.join();
+        t2.join();
+        // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty
+        assertEquals(pool.queued(), 0);
+    }
+
+    private static class BufferPoolAllocator implements Runnable {
+        BufferPool pool;
+        long maxBlockTimeMs;
+
+        BufferPoolAllocator(BufferPool pool, long maxBlockTimeMs) {
+            this.pool = pool;
+            this.maxBlockTimeMs = maxBlockTimeMs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                pool.allocate(2, maxBlockTimeMs);
+                fail("The buffer allocated more memory than its maximum value 2");
+            } catch (TimeoutException e) {
+                // this is good
+            } catch (InterruptedException e) {
+                // this can be neglected
+            }
+        }
+    }
+
+    /**
      * This test creates lots of threads that hammer on the pool
      */
     @Test