You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Marco Lotz (Jira)" <ji...@apache.org> on 2021/02/09 15:18:00 UTC

[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period

    [ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281834#comment-17281834 ] 

Marco Lotz commented on KAFKA-9524:
-----------------------------------

I have reproduced the bug on the Unit Tests and can confirm it. Changing TimeWindowedKStreamImplTest before() method to use the grace period specified on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value for grace period. Timewindows without grace period seem not to be affected (confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be negative.");
    }
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period - and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
    final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
    final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
    if (afterWindowEndMs < 0) {
        throw new IllegalArgumentException("Grace period must not be negative.");
    }
    final long effectiveMaintainDurationMs = sizeMs + afterWindowEndMs > maintainDurationMs? sizeMs + afterWindowEndMs : maintainDurationMs;
    return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs minimum of 1 day. If this is not the case, the ternary operator can be simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would suggest scoping this ticket to fix this small bug instead of removing deprecations - in order to enable minor patches bug-fixes. I can send a PR fixing it and adding unit tests. 

 

> Default window retention does not consider grace period
> -------------------------------------------------------
>
>                 Key: KAFKA-9524
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9524
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: Michael Bingham
>            Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-0000000002 must be no smaller than its window size plus the grace period. Got size=[1728000000], grace=[300000], retention=[1728000000]{code}
> Ideally, Streams should include grace period when implicitly setting window retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)