You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2019/05/02 20:20:00 UTC

[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

    [ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831927#comment-16831927 ] 

John Roesler commented on KAFKA-8315:
-------------------------------------

Interesting. Thanks for the context. For some reason, the slack link doesn't take me to the thread.

Your understanding is almost spot-on.

The grace period defines how long after the window ends it will accept late-arriving records and update the results. The retention time defines how long we keep the state of the window in storage. Clearly, the retention time must be at least big enough to support the updates that may happen during the grace period, but it could be much larger, to support Interactive Queries even after the window is closed to updates.

Join windows are a little bit different, because they are not queriable. Therefore, there is no reason to have any retention beyond the grace period. This is also why there's no `Materialized` parameter. The state for the join is purely bookeeping, not a "materialized view" in the data processing sense. Since you mention the apparent similarity with grouped aggregations, the fact that JoinWindows shares a class hierarchy with Windows, and the fact that it uses a "normal" WindowStore internally, was mostly for implementation convenience. It's actually a little semantically abusive if you really get into it, and I've heard a few times that people would like to break joins out and clean the whole situation up.

Things get really complicated when we need to compute defaults, though. The concepts of "grace" and "retention" used to be coupled into just "retention" (aka "until" aka "maintainMs"), and the default was set to 24h. So, if we pick any default grace period shorter than 24h, then some apps may start to drop late data that didn't before. Also, the "retention" configuration in Windows is only deprecated, not removed, so someone may set a retention time on the deprecated methods, but not a grace period, and we also need to do the "right thing" in that case. This is just in the way of justifying why the code is so complicated. Hopefully, we can drop the deprecated methods soonish and clean the whole thing up.

So, back to your actual behavior, you *should* see that the window stores for join windows use `Duration.ofMillis(windows.size() + windows.gracePeriodMs())`, as you pointed out above. The deprecated `until` should be ignored. It's possible the topic retention doesn't get updated when you change your configs, which would be a bug.

One thing I didn't understand is the arithmetic from your conversation. I'll take a shot and maybe you can set me straight...
You want to join 2 years of historical data.
For each join candidate, you only want to look back 2 days, so you set the join window to size=2 days.
You want to emit updated join results in the case of time-disordered records, but not indefinitely. Specifically, you only want to emit updated results up to 2 days after the fact, so you set the grace period to 2 days as well.
With these configurations, you should see the retention time on the topic set to at least 4 days. 120 hours is 5 days, so this seems about right to me (there might be some fudge factor, I'm not sure).

I guess the big problem is that your data fails to join. I'd start with identifying some pair of records that you think *should* join, and then identify why they don't (could be a join window too small, or it could be the grace period too small).

> Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), but there is no where to pass a `Materialized` instance to the join operation, only to the group operation is supported it seems.
>  
> Slack conversation here : [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the grace period, so I think this is more than a documentation fix (see comments below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)