You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/12/20 16:51:39 UTC

[kafka] branch 2.4 updated: MINOR: Reset timer when all the buffer is drained and empty (#7573)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new f0cd17a  MINOR: Reset timer when all the buffer is drained and empty (#7573)
f0cd17a is described below

commit f0cd17a76c2cdabc4f888075e206e8f7cb16e1d7
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Oct 22 09:27:38 2019 -0700

    MINOR: Reset timer when all the buffer is drained and empty (#7573)
    
    For scenarios where the incoming traffic of all input partitions are small, there's a pitfall that the enforced processing timer is not reset after we have enforce processed ALL records. The fix itself is pretty simple: we just reset the timer when there's no buffered records.
    
    Reviewers: Javier Holguera <ja...@gmail.com>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>
    
    fix unit test
---
 .../streams/processor/internals/StreamTask.java    |  3 ++
 .../processor/internals/StreamTaskTest.java        | 48 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index be113cd..cf161ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -398,6 +398,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 return false;
             }
         } else {
+            // there's no data in any of the topics; we should reset the enforced
+            // processing timer
+            idleStartTime = RecordQueue.UNKNOWN;
             return false;
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e1252d3..8c4e66d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -777,7 +777,7 @@ public class StreamTaskTest {
 
         assertFalse(task.isProcessable(time.milliseconds()));
 
-        assertFalse(task.isProcessable(time.milliseconds() + 50L));
+        assertFalse(task.isProcessable(time.milliseconds() + 99L));
 
         assertTrue(task.isProcessable(time.milliseconds() + 100L));
         assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
@@ -804,6 +804,52 @@ public class StreamTaskTest {
         assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue());
     }
 
+    @Test
+    public void shouldNotBeProcessableIfNoDataAvailable() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        final MetricName enforcedProcessMetric = metrics.metricName(
+            "enforced-processing-total",
+            "stream-task-metrics",
+            mkMap(mkEntry("client-id", Thread.currentThread().getName()), mkEntry("task-id", taskId00.toString()))
+        );
+
+        assertFalse(task.isProcessable(0L));
+        assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable(time.milliseconds()));
+
+        assertFalse(task.isProcessable(time.milliseconds() + 99L));
+
+        assertTrue(task.isProcessable(time.milliseconds() + 100L));
+        assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        // once the buffer is drained and no new records coming, the timer should be reset
+        task.process();
+
+        assertFalse(task.isProcessable(time.milliseconds() + 110L));
+        assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        // check that after time is reset, we only falls into enforced processing after the
+        // whole timeout has elapsed again
+        task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable(time.milliseconds() + 150L));
+        assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        assertFalse(task.isProcessable(time.milliseconds() + 249L));
+        assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        assertTrue(task.isProcessable(time.milliseconds() + 250L));
+        assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
+    }
+
 
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {