You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/12/20 16:51:39 UTC
[kafka] branch 2.4 updated: MINOR: Reset timer when all the buffer
is drained and empty (#7573)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new f0cd17a MINOR: Reset timer when all the buffer is drained and empty (#7573)
f0cd17a is described below
commit f0cd17a76c2cdabc4f888075e206e8f7cb16e1d7
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Oct 22 09:27:38 2019 -0700
MINOR: Reset timer when all the buffer is drained and empty (#7573)
For scenarios where the incoming traffic of all input partitions are small, there's a pitfall that the enforced processing timer is not reset after we have enforce processed ALL records. The fix itself is pretty simple: we just reset the timer when there's no buffered records.
Reviewers: Javier Holguera <ja...@gmail.com>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>
fix unit test
---
.../streams/processor/internals/StreamTask.java | 3 ++
.../processor/internals/StreamTaskTest.java | 48 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index be113cd..cf161ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -398,6 +398,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return false;
}
} else {
+ // there's no data in any of the topics; we should reset the enforced
+ // processing timer
+ idleStartTime = RecordQueue.UNKNOWN;
return false;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e1252d3..8c4e66d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -777,7 +777,7 @@ public class StreamTaskTest {
assertFalse(task.isProcessable(time.milliseconds()));
- assertFalse(task.isProcessable(time.milliseconds() + 50L));
+ assertFalse(task.isProcessable(time.milliseconds() + 99L));
assertTrue(task.isProcessable(time.milliseconds() + 100L));
assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
@@ -804,6 +804,52 @@ public class StreamTaskTest {
assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue());
}
+ @Test
+ public void shouldNotBeProcessableIfNoDataAvailable() {
+ task = createStatelessTask(createConfig(false));
+ task.initializeStateStores();
+ task.initializeTopology();
+
+ final MetricName enforcedProcessMetric = metrics.metricName(
+ "enforced-processing-total",
+ "stream-task-metrics",
+ mkMap(mkEntry("client-id", Thread.currentThread().getName()), mkEntry("task-id", taskId00.toString()))
+ );
+
+ assertFalse(task.isProcessable(0L));
+ assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+ final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+ task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+ assertFalse(task.isProcessable(time.milliseconds()));
+
+ assertFalse(task.isProcessable(time.milliseconds() + 99L));
+
+ assertTrue(task.isProcessable(time.milliseconds() + 100L));
+ assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+ // once the buffer is drained and no new records coming, the timer should be reset
+ task.process();
+
+ assertFalse(task.isProcessable(time.milliseconds() + 110L));
+ assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+ // check that after time is reset, we only falls into enforced processing after the
+ // whole timeout has elapsed again
+ task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+ assertFalse(task.isProcessable(time.milliseconds() + 150L));
+ assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+ assertFalse(task.isProcessable(time.milliseconds() + 249L));
+ assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+ assertTrue(task.isProcessable(time.milliseconds() + 250L));
+ assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
+ }
+
@Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {