You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/06 20:58:00 UTC

kafka git commit: KAFKA-3655; awaitFlushCompletion() in RecordAccumulator should always decrement flushesInProgress count

Repository: kafka
Updated Branches:
  refs/heads/trunk 58f9d7cf8 -> 717eea835


KAFKA-3655; awaitFlushCompletion() in RecordAccumulator should always decrement flushesInProgress count

Author: Chen Zhu <am...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1315 from zhuchen1018/KAFKA-3655


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/717eea83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/717eea83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/717eea83

Branch: refs/heads/trunk
Commit: 717eea8350feb670e8ba3dd3505c708a8a52de71
Parents: 58f9d7c
Author: Chen Zhu <am...@gmail.com>
Authored: Fri May 6 21:57:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 6 21:57:53 2016 +0100

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 13 +++++---
 .../internals/RecordAccumulatorTest.java        | 31 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1766609..5339096 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -430,8 +430,10 @@ public final class RecordAccumulator {
     
     /**
      * Are there any threads currently waiting on a flush?
+     *
+     * package private for test
      */
-    private boolean flushInProgress() {
+    boolean flushInProgress() {
         return flushesInProgress.get() > 0;
     }
     
@@ -453,9 +455,12 @@ public final class RecordAccumulator {
      * Mark all partitions as ready to send and block until the send is complete
      */
     public void awaitFlushCompletion() throws InterruptedException {
-        for (RecordBatch batch: this.incomplete.all())
-            batch.produceFuture.await();
-        this.flushesInProgress.decrementAndGet();
+        try {
+            for (RecordBatch batch : this.incomplete.all())
+                batch.produceFuture.await();
+        } finally {
+            this.flushesInProgress.decrementAndGet();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index a39d2e8..b3a5a04 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
 import org.junit.After;
 import org.junit.Test;
 
@@ -57,6 +59,7 @@ public class RecordAccumulatorTest {
     private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
     private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
     private MockTime time = new MockTime();
+    private SystemTime systemTime = new SystemTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
@@ -272,6 +275,34 @@ public class RecordAccumulatorTest {
         assertFalse(accum.hasUnsent());
     }
 
+
+    private void delayedInterrupt(final Thread thread, final long delayMs) {
+        Thread t = new Thread() {
+            public void run() {
+                systemTime.sleep(delayMs);
+                thread.interrupt();
+            }
+        };
+        t.start();
+    }
+
+    @Test
+    public void testAwaitFlushComplete() throws Exception {
+        RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time);
+        accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
+
+        accum.beginFlush();
+        assertTrue(accum.flushInProgress());
+        delayedInterrupt(Thread.currentThread(), 1000L);
+        try {
+            accum.awaitFlushCompletion();
+            fail("awaitFlushCompletion should throw InterruptException");
+        } catch (InterruptedException e) {
+            assertFalse("flushInProgress count should be decremented even if thread is interrupted", accum.flushInProgress());
+        }
+    }
+
+
     @Test
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;