You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/04/20 21:21:00 UTC

[jira] [Comment Edited] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

    [ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088091#comment-17088091 ] 

John Roesler edited comment on KAFKA-8924 at 4/20/20, 9:20 PM:
---------------------------------------------------------------

I just had another conversation with someone else who got bit by this default.

 

Changing the default grace period to zero can be accomplished right now with no backward compatibility concern by deprecating the current factory:
{code:java}
public static TimeWindows of(final Duration size){code}
and replacing it with a new, equivalent factory (with different default semantics)
{code:java}
public static TimeWindows ofSize(final Duration size){code}
 

However, I'm concerned that this will just cause the opposite problem... People who have very slight amounts of lateness in their streams and don't even realize it would be losing data (and maybe not realize that, either, until too late). It seems likely that people may not have disorderedness in their test data, but only in production, if they don't anticipate this issue.

For that reason, it feels sane to make people at least think about it:
{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final Duration grace)
{code}
I'll assert that it's not inconvenient at all to say explicitly:
{code:java}
ofSizeAndGracePeriod(/*whatever*/, Duration.ZERO)
{code}
If that's what you really wanted. Way more convenient than having to track down either dropped data or delayed Suppress results later on.


was (Author: vvcephei):
I just had another conversation with someone else who got bit by this default.

 

Changing the default grace period to zero can be accomplished right now with no backward compatibility concern by deprecating the current factory:
{code:java}
public static TimeWindows of(final Duration size){code}
and replacing it with a new, equivalent factory (with different default semantics)
{code:java}
public static TimeWindows ofSize(final Duration size){code}
 

However, I'm concerned that this will just cause the opposite problem... People who have very slight amounts of lateness in their streams and don't even realize it would be losing data (and maybe not realize that, either, until too late). It seems likely that people may not have disorderedness in their test data, but only in production, if they don't anticipate this issue.

For that reason, it feels sane to make people at least think about it:
{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final Duration grace)
{code}
I'll assert that it's not inconvenient at all to say explicitly:
{code:java}
ofSizeAndGracePeriod(/*whatever*/, Duration.ZERO)
{code}
If that's what you really wanted. Way more convenient that having to track down either dropped data or delayed Suppress results later on.

> Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-8924
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8924
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Michał
>            Assignee: Michał
>            Priority: Major
>              Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
>     // NOTE: in the future, when we remove maintainMs,
>     // we should default the grace period to 24h to maintain the default behavior,
>     // or we can default to (24h - size) if you want to be super accurate.
>     return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where [~vvcephei] was (maybe) aware of that and all the scenarios specify the gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db 
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for me in my project, however, I am not aware of the impact it would have done due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)