You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2022/02/23 22:52:00 UTC
[jira] [Updated] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval
[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-13678:
------------------------------------
Issue Type: Improvement (was: Bug)
> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> ---------------------------------------------------------------------
>
> Key: KAFKA-13678
> URL: https://issues.apache.org/jira/browse/KAFKA-13678
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.0.0
> Reporter: Lorenzo Cagnatel
> Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs immediately as documented, but the second one is not triggered at *t_schedule + interval* but it could happen before that time.
> For example, assume that we schedule a punctuation every 10 sec at timestamp 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one, breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
> switch (type) {
> case STREAM_TIME:
> // align punctuation to 0L, punctuate as soon as we have data
> return schedule(0L, interval, type, punctuator);
> case WALL_CLOCK_TIME:
> // align punctuation to now, punctuate after interval has elapsed
> return schedule(time.milliseconds() + interval, interval, type, punctuator);
> default:
> throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
> }
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)