You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/05 22:49:55 UTC
kafka git commit: KAFKA-3651;
Remove the condition variable waiting on memory availability in
Bufferpool when a TimeoutException is thrown
Repository: kafka
Updated Branches:
refs/heads/trunk 8429db937 -> 6856c5c21
KAFKA-3651; Remove the condition variable waiting on memory availability in Bufferpool when a TimeoutException is thrown
Whenever the BufferPool throws a "Failed to allocate memory within the configured max blocking time" exception, it should also remove the condition object from the waiters deque
Author: MayureshGharat <gh...@gmail.com>
Reviewers: Chen Zhu <am...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1314 from MayureshGharat/kafka-3651
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6856c5c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6856c5c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6856c5c2
Branch: refs/heads/trunk
Commit: 6856c5c214fb0a40b18cfb25db3dadae320c4142
Parents: 8429db9
Author: Mayuresh Gharat <gh...@gmail.com>
Authored: Thu May 5 23:31:46 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 5 23:31:46 2016 +0100
----------------------------------------------------------------------
.../clients/producer/internals/BufferPool.java | 26 +++++--
.../producer/internals/BufferPoolTest.java | 77 +++++++++++++++++++-
2 files changed, 95 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6856c5c2/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 5577971..b42b0ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -123,16 +123,25 @@ public final class BufferPool {
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
- boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
- long endWaitNs = time.nanoseconds();
- long timeNs = Math.max(0L, endWaitNs - startWaitNs);
- this.waitTime.record(timeNs, time.milliseconds());
+ long timeNs;
+ boolean waitingTimeElapsed;
+ try {
+ waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ this.waiters.remove(moreMemory);
+ throw e;
+ } finally {
+ long endWaitNs = time.nanoseconds();
+ timeNs = Math.max(0L, endWaitNs - startWaitNs);
+ this.waitTime.record(timeNs, time.milliseconds());
+ }
- if (waitingTimeElapsed)
+ if (waitingTimeElapsed) {
+ this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
+ }
remainingTimeToBlockNs -= timeNs;
-
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
@@ -262,4 +271,9 @@ public final class BufferPool {
public long totalMemory() {
return this.totalMemory;
}
+
+ // package-private method used only for testing
+ Deque<Condition> waiters() {
+ return this.waiters;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6856c5c2/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 88e8943..48682b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -26,11 +26,14 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
@@ -148,8 +151,6 @@ public class BufferPoolTest {
/**
* Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
* And verify that the allocation should finish soon after the maxBlockTimeMs.
- *
- * @throws Exception
*/
@Test
public void testBlockTimeout() throws Exception {
@@ -175,6 +176,78 @@ public class BufferPoolTest {
}
/**
+ * Test if the waiter that is waiting on availability of more memory is cleaned up when a timeout occurs
+ */
+ @Test
+ public void testCleanupMemoryAvailabilityWaiterOnBlockTimeout() throws Exception {
+ BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
+ pool.allocate(1, maxBlockTimeMs);
+ try {
+ pool.allocate(2, maxBlockTimeMs);
+ fail("The buffer allocated more memory than its maximum value 2");
+ } catch (TimeoutException e) {
+ // this is good
+ }
+ assertTrue(pool.queued() == 0);
+ }
+
+ /**
+ * Test if the waiter that is waiting on availability of more memory is cleaned up when an interruption occurs
+ */
+ @Test
+ public void testCleanupMemoryAvailabilityWaiterOnInterruption() throws Exception {
+ BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
+ long blockTime = 5000;
+ pool.allocate(1, maxBlockTimeMs);
+ Thread t1 = new Thread(new BufferPoolAllocator(pool, blockTime));
+ Thread t2 = new Thread(new BufferPoolAllocator(pool, blockTime));
+ // start thread t1 which will try to allocate more memory on to the Buffer pool
+ t1.start();
+ // sleep for 500ms. Condition variable c1 associated with pool.allocate() by thread t1 will be inserted in the waiters queue.
+ Thread.sleep(500);
+ Deque<Condition> waiters = pool.waiters();
+ // get the condition object associated with pool.allocate() by thread t1
+ Condition c1 = waiters.getFirst();
+ // start thread t2 which will try to allocate more memory on to the Buffer pool
+ t2.start();
+ // sleep for 500ms. Condition variable c2 associated with pool.allocate() by thread t2 will be inserted in the waiters queue. The waiters queue will have 2 entries c1 and c2.
+ Thread.sleep(500);
+ t1.interrupt();
+ // sleep for 500ms.
+ Thread.sleep(500);
+ // get the condition object associated with allocate() by thread t2
+ Condition c2 = waiters.getLast();
+ t2.interrupt();
+ assertNotEquals(c1, c2);
+ t1.join();
+ t2.join();
+ // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty
+ assertEquals(pool.queued(), 0);
+ }
+
+ private static class BufferPoolAllocator implements Runnable {
+ BufferPool pool;
+ long maxBlockTimeMs;
+
+ BufferPoolAllocator(BufferPool pool, long maxBlockTimeMs) {
+ this.pool = pool;
+ this.maxBlockTimeMs = maxBlockTimeMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ pool.allocate(2, maxBlockTimeMs);
+ fail("The buffer allocated more memory than its maximum value 2");
+ } catch (TimeoutException e) {
+ // this is good
+ } catch (InterruptedException e) {
+ // this can be neglected
+ }
+ }
+ }
+
+ /**
* This test creates lots of threads that hammer on the pool
*/
@Test