You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/06 20:58:00 UTC
kafka git commit: KAFKA-3655;
awaitFlushCompletion() in RecordAccumulator should always decrement
flushesInProgress count
Repository: kafka
Updated Branches:
refs/heads/trunk 58f9d7cf8 -> 717eea835
KAFKA-3655; awaitFlushCompletion() in RecordAccumulator should always decrement flushesInProgress count
Author: Chen Zhu <am...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1315 from zhuchen1018/KAFKA-3655
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/717eea83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/717eea83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/717eea83
Branch: refs/heads/trunk
Commit: 717eea8350feb670e8ba3dd3505c708a8a52de71
Parents: 58f9d7c
Author: Chen Zhu <am...@gmail.com>
Authored: Fri May 6 21:57:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 6 21:57:53 2016 +0100
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 13 +++++---
.../internals/RecordAccumulatorTest.java | 31 ++++++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1766609..5339096 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -430,8 +430,10 @@ public final class RecordAccumulator {
/**
* Are there any threads currently waiting on a flush?
+ *
+ * package private for test
*/
- private boolean flushInProgress() {
+ boolean flushInProgress() {
return flushesInProgress.get() > 0;
}
@@ -453,9 +455,12 @@ public final class RecordAccumulator {
* Mark all partitions as ready to send and block until the send is complete
*/
public void awaitFlushCompletion() throws InterruptedException {
- for (RecordBatch batch: this.incomplete.all())
- batch.produceFuture.await();
- this.flushesInProgress.decrementAndGet();
+ try {
+ for (RecordBatch batch : this.incomplete.all())
+ batch.produceFuture.await();
+ } finally {
+ this.flushesInProgress.decrementAndGet();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/717eea83/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index a39d2e8..b3a5a04 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -39,6 +40,7 @@ import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
import org.junit.After;
import org.junit.Test;
@@ -57,6 +59,7 @@ public class RecordAccumulatorTest {
private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
private MockTime time = new MockTime();
+ private SystemTime systemTime = new SystemTime();
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
@@ -272,6 +275,34 @@ public class RecordAccumulatorTest {
assertFalse(accum.hasUnsent());
}
+
+ private void delayedInterrupt(final Thread thread, final long delayMs) {
+ Thread t = new Thread() {
+ public void run() {
+ systemTime.sleep(delayMs);
+ thread.interrupt();
+ }
+ };
+ t.start();
+ }
+
+ @Test
+ public void testAwaitFlushComplete() throws Exception {
+ RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time);
+ accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
+
+ accum.beginFlush();
+ assertTrue(accum.flushInProgress());
+ delayedInterrupt(Thread.currentThread(), 1000L);
+ try {
+ accum.awaitFlushCompletion();
+ fail("awaitFlushCompletion should throw InterruptException");
+ } catch (InterruptedException e) {
+ assertFalse("flushInProgress count should be decremented even if thread is interrupted", accum.flushInProgress());
+ }
+ }
+
+
@Test
public void testAbortIncompleteBatches() throws Exception {
long lingerMs = Long.MAX_VALUE;