You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2022/02/23 22:52:00 UTC

[jira] [Updated] (KAFKA-13678) 2nd punctuation using STREAM_TIME does not respect scheduled interval

     [ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-13678:
------------------------------------
    Issue Type: Improvement  (was: Bug)

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-13678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13678
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.0.0
>            Reporter: Lorenzo Cagnatel
>            Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs immediately as documented, but the second one is not triggered at *t_schedule + interval* but it could happen before that time. 
> For example, assume that we schedule a punctuation every 10 sec at timestamp 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one, breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
>    switch (type) {
>       case STREAM_TIME:
>          // align punctuation to 0L, punctuate as soon as we have data
>          return schedule(0L, interval, type, punctuator);
>       case WALL_CLOCK_TIME:
>          // align punctuation to now, punctuate after interval has elapsed
>          return schedule(time.milliseconds() + interval, interval, type, punctuator);
>       default:
>          throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
>    }
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)