You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/04 01:18:24 UTC

kafka git commit: KAFKA-3648; maxTimeToBlock in BufferPool.allocate should be enforced

Repository: kafka
Updated Branches:
  refs/heads/trunk af0137884 -> 94e12a2e1


KAFKA-3648; maxTimeToBlock in BufferPool.allocate should be enforced

 `maxTimeToBlock` needs to be updated in each loop iteration. Also record waitTime before throwing `TimeoutException`

Author: Chen Zhu <am...@gmail.com>

Reviewers: Dong Lin <li...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1304 from zhuchen1018/KAFKA-3648


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94e12a2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94e12a2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94e12a2e

Branch: refs/heads/trunk
Commit: 94e12a2e1fbd2f43821643d67a0d91f03b3f94e5
Parents: af01378
Author: Chen Zhu <am...@gmail.com>
Authored: Tue May 3 23:59:12 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 3 23:59:12 2016 +0100

----------------------------------------------------------------------
 .../clients/producer/internals/BufferPool.java  | 22 ++++++----
 .../producer/internals/BufferPoolTest.java      | 45 +++++++++++++++-----
 2 files changed, 49 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94e12a2e/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index f881e62..5577971 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -83,13 +83,13 @@ public final class BufferPool {
      * is configured with blocking mode.
      * 
      * @param size The buffer size to allocate in bytes
-     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
+     * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
      * @return The buffer
      * @throws InterruptedException If the thread is interrupted while blocked
      * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
      *         forever)
      */
-    public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException {
+    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
         if (size > this.totalMemory)
             throw new IllegalArgumentException("Attempt to allocate " + size
                                                + " bytes, but there is a hard limit of "
@@ -117,15 +117,21 @@ public final class BufferPool {
                 int accumulated = 0;
                 ByteBuffer buffer = null;
                 Condition moreMemory = this.lock.newCondition();
+                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                 this.waiters.addLast(moreMemory);
                 // loop over and over until we have a buffer or have reserved
                 // enough memory to allocate one
                 while (accumulated < size) {
-                    long startWait = time.nanoseconds();
-                    if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS))
-                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time");
-                    long endWait = time.nanoseconds();
-                    this.waitTime.record(endWait - startWait, time.milliseconds());
+                    long startWaitNs = time.nanoseconds();
+                    boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
+                    long endWaitNs = time.nanoseconds();
+                    long timeNs = Math.max(0L, endWaitNs - startWaitNs);
+                    this.waitTime.record(timeNs, time.milliseconds());
+
+                    if (waitingTimeElapsed)
+                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
+
+                    remainingTimeToBlockNs -= timeNs;
 
                     // check if we can satisfy this request from the free list,
                     // otherwise allocate memory
@@ -256,4 +262,4 @@ public final class BufferPool {
     public long totalMemory() {
         return this.totalMemory;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/94e12a2e/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 72d85a2..88e8943 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -27,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertTrue;
@@ -34,10 +36,11 @@ import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 
 public class BufferPoolTest {
-    private MockTime time = new MockTime();
-    private Metrics metrics = new Metrics(time);
-    private final long maxBlockTimeMs =  2000;
-    String metricGroup = "TestMetrics";
+    private final MockTime time = new MockTime();
+    private final SystemTime systemTime = new SystemTime();
+    private final Metrics metrics = new Metrics(time);
+    private final long maxBlockTimeMs = 2000;
+    private final String metricGroup = "TestMetrics";
 
     @After
     public void teardown() {
@@ -96,7 +99,7 @@ public class BufferPoolTest {
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
         assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
         doDealloc.countDown(); // return the memory
-        allocation.await();
+        assertTrue("Allocation should succeed soon after de-allocation", allocation.await(1, TimeUnit.SECONDS));
     }
 
     private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
@@ -115,6 +118,16 @@ public class BufferPoolTest {
         return latch;
     }
 
+    private void delayedDeallocate(final BufferPool pool, final ByteBuffer buffer, final long delayMs) {
+        Thread thread = new Thread() {
+            public void run() {
+                systemTime.sleep(delayMs);
+                pool.deallocate(buffer);
+            }
+        };
+        thread.start();
+    }
+
     private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
         final CountDownLatch completed = new CountDownLatch(1);
         Thread thread = new Thread() {
@@ -133,20 +146,32 @@ public class BufferPoolTest {
     }
 
     /**
-     * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time
+     * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
+     * And verify that the allocation should finish soon after the maxBlockTimeMs.
      *
      * @throws Exception
      */
     @Test
     public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
-        pool.allocate(1, maxBlockTimeMs);
+        BufferPool pool = new BufferPool(10, 1, metrics, systemTime, metricGroup);
+        ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
+        ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
+        ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
+        // First two buffers will be de-allocated within maxBlockTimeMs since the most recent de-allocation
+        delayedDeallocate(pool, buffer1, maxBlockTimeMs / 2);
+        delayedDeallocate(pool, buffer2, maxBlockTimeMs);
+        // The third buffer will be de-allocated after maxBlockTimeMs since the most recent de-allocation
+        delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
+
+        long beginTimeMs = systemTime.milliseconds();
         try {
-            pool.allocate(2, maxBlockTimeMs);
-            fail("The buffer allocated more memory than its maximum value 2");
+            pool.allocate(10, maxBlockTimeMs);
+            fail("The buffer allocated more memory than its maximum value 10");
         } catch (TimeoutException e) {
             // this is good
         }
+        long endTimeMs = systemTime.milliseconds();
+        assertTrue("Allocation should finish not much later than maxBlockTimeMs", endTimeMs - beginTimeMs < maxBlockTimeMs + 1000);
     }
 
     /**