You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/31 02:19:03 UTC

[kafka] branch trunk updated: KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15568fd  KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
15568fd is described below

commit 15568fdbe753ea1c2e5fef571daf45e435999c07
Author: fredfp <fr...@gmail.com>
AuthorDate: Wed Jan 31 10:18:55 2018 +0800

    KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
    
    This PR avoids unnecessary punctuation calls if punctuations are missed due to large time advances. It also aligns punctuation schedules to the epoch.
    
    Author: Frederic Arno
    
    Reviewers: Michal Borowiecki <mi...@openbet.com>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/processor/ProcessorContext.java  |  15 +-
 .../processor/internals/PunctuationSchedule.java   |  34 ++--
 .../streams/processor/internals/StreamTask.java    |  26 +++-
 .../processor/internals/PunctuationQueueTest.java  |  60 ++++++-
 .../processor/internals/StreamTaskTest.java        | 172 +++++++++++++++++++--
 .../kafka/streams/TopologyTestDriverTest.java      |  17 +-
 6 files changed, 285 insertions(+), 39 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index d4393aa..42902a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -102,11 +102,22 @@ public interface ProcessorContext {
      * <ul>
      *   <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages
      *   in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is processed.
      *   <b>NOTE:</b> Only advanced if messages arrive</li>
      *   <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock time),
      *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
-     *   independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
-     *   by how long an iteration of the processing loop takes to complete</li>
+     *   independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
      * </ul>
      *
      * @param intervalMs the time interval between punctuations in milliseconds
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index cf50005..9c0ec88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -27,12 +27,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
     // this Cancellable will be re-pointed at the successor schedule in next()
     private final RepointableCancellable cancellable;
 
-    PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) {
-        this(node, 0L, interval, punctuator, new RepointableCancellable());
+    PunctuationSchedule(final ProcessorNode node,
+                        final long time,
+                        final long interval,
+                        final Punctuator punctuator) {
+        this(node, time, interval, punctuator, new RepointableCancellable());
         cancellable.setSchedule(this);
     }
 
-    private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator punctuator, RepointableCancellable cancellable) {
+    private PunctuationSchedule(final ProcessorNode node,
+                                final long time,
+                                final long interval,
+                                final Punctuator punctuator,
+                                final RepointableCancellable cancellable) {
         super(node, time);
         this.interval = interval;
         this.punctuator = punctuator;
@@ -59,14 +66,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
         return isCancelled;
     }
 
-    public PunctuationSchedule next(long currTimestamp) {
-        PunctuationSchedule nextSchedule;
-        // we need to special handle the case when it is firstly triggered (i.e. the timestamp
-        // is equal to the interval) by reschedule based on the currTimestamp
-        if (timestamp == 0L)
-            nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval, punctuator, cancellable);
-        else
-            nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval, punctuator, cancellable);
+    public PunctuationSchedule next(final long currTimestamp) {
+        long nextPunctuationTime = timestamp + interval;
+        if (currTimestamp >= nextPunctuationTime) {
+            // we missed one ore more punctuations
+            // avoid scheduling a new punctuations immediately, this can happen:
+            // - when using STREAM_TIME punctuation and there was a gap i.e., no data was
+            //   received for at least 2*interval
+            // - when using WALL_CLOCK_TIME and there was a gap i.e., punctuation was delayed for at least 2*interval (GC pause, overload, ...)
+            final long intervalsMissed = (currTimestamp - timestamp) / interval;
+            nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
+        }
+
+        final PunctuationSchedule nextSchedule = new PunctuationSchedule(value, nextPunctuationTime, interval, punctuator, cancellable);
 
         cancellable.setSchedule(nextSchedule);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 11b2f89..56c0ab3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -578,16 +578,40 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @throws IllegalStateException if the current node is not null
      */
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
+        switch (type) {
+            case STREAM_TIME:
+                // align punctuation to 0L, punctuate as soon as we have data
+                return schedule(0L, interval, type, punctuator);
+            case WALL_CLOCK_TIME:
+                // align punctuation to now, punctuate after interval has elapsed
+                return schedule(time.milliseconds() + interval, interval, type, punctuator);
+            default:
+                throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
+        }
+    }
+
+    /**
+     * Schedules a punctuation for the processor
+     *
+     * @param startTime time of the first punctuation
+     * @param interval the interval in milliseconds
+     * @param type the punctuation type
+     * @throws IllegalStateException if the current node is not null
+     */
+    Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() == null) {
             throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
         }
 
-        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
+        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
 
         switch (type) {
             case STREAM_TIME:
+                // STREAM_TIME punctuation is data driven, will first punctuate as soon as stream-time is known and >= time,
+                // stream-time is known when we have received at least one record from each input topic
                 return streamTimePunctuationQueue.schedule(schedule);
             case WALL_CLOCK_TIME:
+                // WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate when now >= time
                 return systemTimePunctuationQueue.schedule(schedule);
             default:
                 throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 1570c9b..09c7a0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -40,7 +40,7 @@ public class PunctuationQueueTest {
             }
         };
 
-        final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator);
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
         final long now = sched.timestamp - 100L;
 
         queue.schedule(sched);
@@ -66,6 +66,64 @@ public class PunctuationQueueTest {
 
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
+    }
+
+    @Test
+    public void testPunctuationIntervalCustomAlignment() {
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
+
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
+        final long now = sched.timestamp - 50L;
+
+        queue.schedule(sched);
+
+        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
+                punctuator.punctuate(time);
+            }
+        };
+
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
     }
 
     private static class TestProcessor extends AbstractProcessor<String, String> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92cfe66..1165d76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -322,63 +322,173 @@ public class StreamTaskTest {
         task.initializeTopology();
 
         task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 32, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 60, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
         task.addRecords(partition2, records(
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 61, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
+        assertEquals(8, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(1, source1.numReceived);
+        assertEquals(6, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
+        assertEquals(3, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(4, task.numBuffered());
+        assertEquals(3, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(2, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
+        assertFalse(task.process());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPunctuateOnceStreamTimeAfterGap() {
+        task = createStatelessTask(false);
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 142, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 155, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 160, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 145, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 159, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 161, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(6, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
+
+        // only one punctuation after 100ms gap
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
+        assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertTrue(task.maybePunctuateStreamTime());
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
 
         assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
+        assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
+        assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on the initial punctuation
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
 
-        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 30L, 40L);
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
     }
 
     @SuppressWarnings("unchecked")
@@ -425,9 +535,14 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateSystemTime());
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
-        time.sleep(10);
+        time.sleep(9);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(20);
         assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30);
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
     }
 
     @Test
@@ -435,11 +550,36 @@ public class StreamTaskTest {
         task = createStatelessTask(false);
         task.initializeStateStores();
         task.initializeTopology();
-        long now = time.milliseconds();
-        assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
+        assertFalse(task.maybePunctuateSystemTime());
         time.sleep(9);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now);
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+    }
+
+    @Test
+    public void shouldPunctuateOnceSystemTimeAfterGap() {
+        task = createStatelessTask(false);
+        task.initializeStateStores();
+        task.initializeTopology();
+        long now = time.milliseconds();
+        time.sleep(100);
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(12);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(7);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1); // punctuate at now + 130
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(105); // punctuate at now + 235
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
     @Test
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 4100f18..5073efd 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -593,36 +593,35 @@ public class TopologyTestDriverTest {
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(51L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 51L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(52L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 52L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(61L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 61L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(65L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 65L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(71L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 71L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(72L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 72L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
         expectedPunctuations.add(95L);
-        expectedPunctuations.add(95L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 95L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(101L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 101L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(102L);
         testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 102L));
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
     }
@@ -637,21 +636,23 @@ public class TopologyTestDriverTest {
 
         final List<Long> expectedPunctuations = new LinkedList<>();
 
-        expectedPunctuations.add(5L);
         testDriver.advanceWallClockTime(5L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
+        expectedPunctuations.add(14L);
         testDriver.advanceWallClockTime(9L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
-        expectedPunctuations.add(15L);
         testDriver.advanceWallClockTime(1L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
 
         expectedPunctuations.add(35L);
-        expectedPunctuations.add(35L);
         testDriver.advanceWallClockTime(20L);
         assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(40L);
+        testDriver.advanceWallClockTime(5L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.