You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2018/11/05 23:44:00 UTC

[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

    [ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675891#comment-16675891 ] 

John Roesler commented on KAFKA-7595:
-------------------------------------

Not sure if I'm thinking about this properly, but...

For example you get some event <k, v1> on the left and <k, v2> on the right, and you want to produce <k, (v1, v2)>. Let's say you get the left event first.
 # With *caching disabled*, *whether or not the join is materialized*, I'd expect to see <k, (v1, null)> followed by <k, (v1, v2)>. It's not clear to me if you call this a "duplicate".
 # With *caching enabled* and the *join not materialized*, I'd expect to see duplicates: <k, (v1, v2)> followed by <k, (v1, v2)>.
 ** This could happen if <k, v1> gets cached on the left and <k, v2> gets cached on the right. They only trigger the join upon flush, and they trigger it independently, so when the left trigger happens, the right value is already visible and vice-versa, hence the duplicates.
 ** Even though the caches try to de-duplicate events, they are mutually oblivious (they don't know their results will later be used in a join, and they each don't know the other exists), so they can't cooperate to de-duplicate the results.
 # If the *join is materialized AND caching is enabled*, then there are still "technically" duplicate events, but they get de-duplicated by the join's cache.

*So this this in mind, can you clarify which of these you see contradicted?* 

*In particular, I wasn't sure if you are calling the sequence "<k, (v1, null)>, <k, (v1, v2)>" a "duplicate" or not. I _think_ this result would be expected for your cases 1 and 2.*

 

And just a plug for this new feature, if you'd like to achieve de-duplication of either sequence "<k, (v1, null)>, <k, (v1, v2)>" or "<k, (v1, v2)>, <k, (v1, v2)>" without having to materialize the join, as of 2.1, you can use the new "suppress" operator: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-7595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7595
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Vik Gamov
>            Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);}}
> {{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}}
> {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)