You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/04 01:18:24 UTC
kafka git commit: KAFKA-3648;
maxTimeToBlock in BufferPool.allocate should be enforced
Repository: kafka
Updated Branches:
refs/heads/trunk af0137884 -> 94e12a2e1
KAFKA-3648; maxTimeToBlock in BufferPool.allocate should be enforced
`maxTimeToBlock` needs to be updated in each loop iteration. Also record waitTime before throwing `TimeoutException`
Author: Chen Zhu <am...@gmail.com>
Reviewers: Dong Lin <li...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1304 from zhuchen1018/KAFKA-3648
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94e12a2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94e12a2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94e12a2e
Branch: refs/heads/trunk
Commit: 94e12a2e1fbd2f43821643d67a0d91f03b3f94e5
Parents: af01378
Author: Chen Zhu <am...@gmail.com>
Authored: Tue May 3 23:59:12 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 3 23:59:12 2016 +0100
----------------------------------------------------------------------
.../clients/producer/internals/BufferPool.java | 22 ++++++----
.../producer/internals/BufferPoolTest.java | 45 +++++++++++++++-----
2 files changed, 49 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/94e12a2e/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index f881e62..5577971 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -83,13 +83,13 @@ public final class BufferPool {
* is configured with blocking mode.
*
* @param size The buffer size to allocate in bytes
- * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
+ * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
* @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked
* @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever)
*/
- public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException {
+ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
@@ -117,15 +117,21 @@ public final class BufferPool {
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
+ long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
- long startWait = time.nanoseconds();
- if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS))
- throw new TimeoutException("Failed to allocate memory within the configured max blocking time");
- long endWait = time.nanoseconds();
- this.waitTime.record(endWait - startWait, time.milliseconds());
+ long startWaitNs = time.nanoseconds();
+ boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
+ long endWaitNs = time.nanoseconds();
+ long timeNs = Math.max(0L, endWaitNs - startWaitNs);
+ this.waitTime.record(timeNs, time.milliseconds());
+
+ if (waitingTimeElapsed)
+ throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
+
+ remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
@@ -256,4 +262,4 @@ public final class BufferPool {
public long totalMemory() {
return this.totalMemory;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/94e12a2e/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 72d85a2..88e8943 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Test;
@@ -27,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
@@ -34,10 +36,11 @@ import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
public class BufferPoolTest {
- private MockTime time = new MockTime();
- private Metrics metrics = new Metrics(time);
- private final long maxBlockTimeMs = 2000;
- String metricGroup = "TestMetrics";
+ private final MockTime time = new MockTime();
+ private final SystemTime systemTime = new SystemTime();
+ private final Metrics metrics = new Metrics(time);
+ private final long maxBlockTimeMs = 2000;
+ private final String metricGroup = "TestMetrics";
@After
public void teardown() {
@@ -96,7 +99,7 @@ public class BufferPoolTest {
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
doDealloc.countDown(); // return the memory
- allocation.await();
+ assertTrue("Allocation should succeed soon after de-allocation", allocation.await(1, TimeUnit.SECONDS));
}
private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
@@ -115,6 +118,16 @@ public class BufferPoolTest {
return latch;
}
+ private void delayedDeallocate(final BufferPool pool, final ByteBuffer buffer, final long delayMs) {
+ Thread thread = new Thread() {
+ public void run() {
+ systemTime.sleep(delayMs);
+ pool.deallocate(buffer);
+ }
+ };
+ thread.start();
+ }
+
private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
final CountDownLatch completed = new CountDownLatch(1);
Thread thread = new Thread() {
@@ -133,20 +146,32 @@ public class BufferPoolTest {
}
/**
- * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time
+ * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
+ * And verify that the allocation should finish soon after the maxBlockTimeMs.
*
* @throws Exception
*/
@Test
public void testBlockTimeout() throws Exception {
- BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
- pool.allocate(1, maxBlockTimeMs);
+ BufferPool pool = new BufferPool(10, 1, metrics, systemTime, metricGroup);
+ ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
+ ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
+ ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
+ // First two buffers will be de-allocated within maxBlockTimeMs since the most recent de-allocation
+ delayedDeallocate(pool, buffer1, maxBlockTimeMs / 2);
+ delayedDeallocate(pool, buffer2, maxBlockTimeMs);
+ // The third buffer will be de-allocated after maxBlockTimeMs since the most recent de-allocation
+ delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
+
+ long beginTimeMs = systemTime.milliseconds();
try {
- pool.allocate(2, maxBlockTimeMs);
- fail("The buffer allocated more memory than its maximum value 2");
+ pool.allocate(10, maxBlockTimeMs);
+ fail("The buffer allocated more memory than its maximum value 10");
} catch (TimeoutException e) {
// this is good
}
+ long endTimeMs = systemTime.milliseconds();
+ assertTrue("Allocation should finish not much later than maxBlockTimeMs", endTimeMs - beginTimeMs < maxBlockTimeMs + 1000);
}
/**