You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrew (JIRA)" <ji...@apache.org> on 2019/05/02 16:28:00 UTC

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation

    [ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831735#comment-16831735 ] 

Andrew edited comment on KAFKA-8315 at 5/2/19 4:27 PM:
-------------------------------------------------------

Ok, thanks.

However, I thought retention period was meant to be independent of grace period, like it is for grouped aggregations? I can see from the code though that the retention is explicitly set to the grace period.

{Code}
 @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(final String joinName,
                                                                                  final JoinWindows windows,
                                                                                  final Serde<K> keySerde,
                                                                                  final Serde<V> valueSerde)

{         return Stores.windowStoreBuilder(             Stores.persistentWindowStore(                 joinName + "-store",                 Duration.ofMillis(windows.size() + windows.gracePeriodMs()),  <------------------------                 Duration.ofMillis(windows.size()),                 true             ),             keySerde,             valueSerde         );     }

 

{Code}


 so it looks like the grace period defines the retention period...


was (Author: the4thamigo_uk):
Ok, thanks. 

However, I thought retention period was meant to be independent of grace period, like it is for grouped aggregations? I can see from the code though that the retention is explicitly set to the grace period.

   \{Code}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(final String joinName,
                                                                                 final JoinWindows windows,
                                                                                 final Serde<K> keySerde,
                                                                                 final Serde<V> valueSerde) {
        return Stores.windowStoreBuilder(
            Stores.persistentWindowStore(
                joinName + "-store",
                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),  <------------------------
                Duration.ofMillis(windows.size()),
                true
            ),
            keySerde,
            valueSerde
        );
    }
{Code}


Andy Smith   [1 minute ago]
so it looks like the grace period defines the retention period...

> Cannot pass Materialized into a join operation
> ----------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` (https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-), but there is no where to pass a `Materialized` instance to the join operation, only to the group operation is supported it seems.
>  
> Slack conversation here : https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)