You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/31 02:19:03 UTC
[kafka] branch trunk updated: KAFKA-6323: punctuate with
WALL_CLOCK_TIME triggered immediately (#4301)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 15568fd KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
15568fd is described below
commit 15568fdbe753ea1c2e5fef571daf45e435999c07
Author: fredfp <fr...@gmail.com>
AuthorDate: Wed Jan 31 10:18:55 2018 +0800
KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
This PR avoids unnecessary punctuation calls if punctuations are missed due to large time advances. It also aligns punctuation schedules to the epoch.
Author: Frederic Arno
Reviewers: Michal Borowiecki <mi...@openbet.com>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/processor/ProcessorContext.java | 15 +-
.../processor/internals/PunctuationSchedule.java | 34 ++--
.../streams/processor/internals/StreamTask.java | 26 +++-
.../processor/internals/PunctuationQueueTest.java | 60 ++++++-
.../processor/internals/StreamTaskTest.java | 172 +++++++++++++++++++--
.../kafka/streams/TopologyTestDriverTest.java | 17 +-
6 files changed, 285 insertions(+), 39 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index d4393aa..42902a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -102,11 +102,22 @@ public interface ProcessorContext {
* <ul>
* <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+ * The first punctuation will be triggered by the first record that is processed.
* <b>NOTE:</b> Only advanced if messages arrive</li>
* <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock time),
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
- * independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
- * by how long an iteration of the processing loop takes to complete</li>
+ * independent of whether new messages arrive.
+ * The first punctuation will be triggered after interval has elapsed.
+ * <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
+ * processing loop takes to complete</li>
+ * </ul>
+ *
+ * <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
+ * This means that "missed" punctuation will be skipped.
+ * It's possible to "miss" a punctuation if:
+ * <ul>
+ * <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
+ * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
* </ul>
*
* @param intervalMs the time interval between punctuations in milliseconds
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index cf50005..9c0ec88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -27,12 +27,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
// this Cancellable will be re-pointed at the successor schedule in next()
private final RepointableCancellable cancellable;
- PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) {
- this(node, 0L, interval, punctuator, new RepointableCancellable());
+ PunctuationSchedule(final ProcessorNode node,
+ final long time,
+ final long interval,
+ final Punctuator punctuator) {
+ this(node, time, interval, punctuator, new RepointableCancellable());
cancellable.setSchedule(this);
}
- private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator punctuator, RepointableCancellable cancellable) {
+ private PunctuationSchedule(final ProcessorNode node,
+ final long time,
+ final long interval,
+ final Punctuator punctuator,
+ final RepointableCancellable cancellable) {
super(node, time);
this.interval = interval;
this.punctuator = punctuator;
@@ -59,14 +66,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
return isCancelled;
}
- public PunctuationSchedule next(long currTimestamp) {
- PunctuationSchedule nextSchedule;
- // we need to special handle the case when it is firstly triggered (i.e. the timestamp
- // is equal to the interval) by reschedule based on the currTimestamp
- if (timestamp == 0L)
- nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval, punctuator, cancellable);
- else
- nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval, punctuator, cancellable);
+ public PunctuationSchedule next(final long currTimestamp) {
+ long nextPunctuationTime = timestamp + interval;
+ if (currTimestamp >= nextPunctuationTime) {
+ // we missed one ore more punctuations
+ // avoid scheduling a new punctuations immediately, this can happen:
+ // - when using STREAM_TIME punctuation and there was a gap i.e., no data was
+ // received for at least 2*interval
+ // - when using WALL_CLOCK_TIME and there was a gap i.e., punctuation was delayed for at least 2*interval (GC pause, overload, ...)
+ final long intervalsMissed = (currTimestamp - timestamp) / interval;
+ nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
+ }
+
+ final PunctuationSchedule nextSchedule = new PunctuationSchedule(value, nextPunctuationTime, interval, punctuator, cancellable);
cancellable.setSchedule(nextSchedule);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 11b2f89..56c0ab3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -578,16 +578,40 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @throws IllegalStateException if the current node is not null
*/
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
+ switch (type) {
+ case STREAM_TIME:
+ // align punctuation to 0L, punctuate as soon as we have data
+ return schedule(0L, interval, type, punctuator);
+ case WALL_CLOCK_TIME:
+ // align punctuation to now, punctuate after interval has elapsed
+ return schedule(time.milliseconds() + interval, interval, type, punctuator);
+ default:
+ throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
+ }
+ }
+
+ /**
+ * Schedules a punctuation for the processor
+ *
+ * @param startTime time of the first punctuation
+ * @param interval the interval in milliseconds
+ * @param type the punctuation type
+ * @throws IllegalStateException if the current node is not null
+ */
+ Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
if (processorContext.currentNode() == null) {
throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
}
- final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
+ final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
switch (type) {
case STREAM_TIME:
+ // STREAM_TIME punctuation is data driven, will first punctuate as soon as stream-time is known and >= time,
+ // stream-time is known when we have received at least one record from each input topic
return streamTimePunctuationQueue.schedule(schedule);
case WALL_CLOCK_TIME:
+ // WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate when now >= time
return systemTimePunctuationQueue.schedule(schedule);
default:
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 1570c9b..09c7a0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -40,7 +40,7 @@ public class PunctuationQueueTest {
}
};
- final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator);
+ final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
final long now = sched.timestamp - 100L;
queue.schedule(sched);
@@ -66,6 +66,64 @@ public class PunctuationQueueTest {
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(2, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(4, processor.punctuatedAt.size());
+ }
+
+ @Test
+ public void testPunctuationIntervalCustomAlignment() {
+ final TestProcessor processor = new TestProcessor();
+ final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
+ final PunctuationQueue queue = new PunctuationQueue();
+ final Punctuator punctuator = new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ node.processor().punctuate(timestamp);
+ }
+ };
+
+ final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
+ final long now = sched.timestamp - 50L;
+
+ queue.schedule(sched);
+
+ ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+ @Override
+ public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
+ punctuator.punctuate(time);
+ }
+ };
+
+ queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(0, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(0, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(2, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(4, processor.punctuatedAt.size());
}
private static class TestProcessor extends AbstractProcessor<String, String> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92cfe66..1165d76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -322,63 +322,173 @@ public class StreamTaskTest {
task.initializeTopology();
task.addRecords(partition1, records(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 32, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 60, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 61, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertTrue(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(5, task.numBuffered());
+ assertEquals(8, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(7, task.numBuffered());
+ assertEquals(2, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(4, task.numBuffered());
- assertEquals(1, source1.numReceived);
+ assertEquals(6, task.numBuffered());
+ assertEquals(2, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
+ assertEquals(3, source1.numReceived);
assertEquals(1, source2.numReceived);
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(4, task.numBuffered());
+ assertEquals(3, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
assertTrue(task.maybePunctuateStreamTime());
assertTrue(task.process());
assertEquals(3, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(2, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
+ assertEquals(5, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
+ assertEquals(5, source1.numReceived);
+ assertEquals(4, source2.numReceived);
+
+ assertFalse(task.process());
+ assertFalse(task.maybePunctuateStreamTime());
+
+ processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPunctuateOnceStreamTimeAfterGap() {
+ task = createStatelessTask(false);
+ task.initializeStateStores();
+ task.initializeTopology();
+
+ task.addRecords(partition1, records(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 142, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 155, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 160, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ ));
+
+ task.addRecords(partition2, records(
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 145, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 159, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 161, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ ));
+
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+
+ assertTrue(task.process());
+ assertEquals(7, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(6, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
+
+ // only one punctuation after 100ms gap
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(2, task.numBuffered());
+ assertEquals(4, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(2, source2.numReceived);
- assertTrue(task.maybePunctuateStreamTime());
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
assertTrue(task.process());
- assertEquals(1, task.numBuffered());
+ assertEquals(3, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(0, task.numBuffered());
+ assertEquals(2, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on the initial punctuation
+
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(4, source2.numReceived);
+
assertFalse(task.process());
assertFalse(task.maybePunctuateStreamTime());
- processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 30L, 40L);
+ processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
}
@SuppressWarnings("unchecked")
@@ -425,9 +535,14 @@ public class StreamTaskTest {
assertTrue(task.maybePunctuateSystemTime());
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
- time.sleep(10);
+ time.sleep(9);
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(1);
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(20);
assertTrue(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30);
+ assertFalse(task.maybePunctuateSystemTime());
+ processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
}
@Test
@@ -435,11 +550,36 @@ public class StreamTaskTest {
task = createStatelessTask(false);
task.initializeStateStores();
task.initializeTopology();
- long now = time.milliseconds();
- assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
+ assertFalse(task.maybePunctuateSystemTime());
time.sleep(9);
assertFalse(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now);
+ processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+ }
+
+ @Test
+ public void shouldPunctuateOnceSystemTimeAfterGap() {
+ task = createStatelessTask(false);
+ task.initializeStateStores();
+ task.initializeTopology();
+ long now = time.milliseconds();
+ time.sleep(100);
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(10);
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(12);
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(7);
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(1); // punctuate at now + 130
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(105); // punctuate at now + 235
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
}
@Test
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 4100f18..5073efd 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -593,36 +593,35 @@ public class TopologyTestDriverTest {
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ expectedPunctuations.add(51L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 51L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
- expectedPunctuations.add(52L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 52L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ expectedPunctuations.add(61L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 61L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
- expectedPunctuations.add(65L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 65L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ expectedPunctuations.add(71L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 71L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
- expectedPunctuations.add(72L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 72L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
expectedPunctuations.add(95L);
- expectedPunctuations.add(95L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 95L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ expectedPunctuations.add(101L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 101L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
- expectedPunctuations.add(102L);
testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 102L));
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
}
@@ -637,21 +636,23 @@ public class TopologyTestDriverTest {
final List<Long> expectedPunctuations = new LinkedList<>();
- expectedPunctuations.add(5L);
testDriver.advanceWallClockTime(5L);
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ expectedPunctuations.add(14L);
testDriver.advanceWallClockTime(9L);
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
- expectedPunctuations.add(15L);
testDriver.advanceWallClockTime(1L);
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
expectedPunctuations.add(35L);
- expectedPunctuations.add(35L);
testDriver.advanceWallClockTime(20L);
assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(40L);
+ testDriver.advanceWallClockTime(5L);
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
}
@Test
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.