You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Damian Guy (JIRA)" <ji...@apache.org> on 2017/07/05 16:02:00 UTC

[jira] [Commented] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in incorrect results

    [ https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075005#comment-16075005 ] 

Damian Guy commented on KAFKA-4609:
-----------------------------------

This was partially fixed by https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+state+stores+and+improved+semantics
If you use one of the join/leftJoin/outerJoin methods that take either a {{StateStoreSupplier}} or {{queryableName}} as a param then it works. However, for the basic join/leftJoin/outerJoin method it doesn't work. In order to make it work properly we need to add another param to these join methods, {{joinSerde}}, so that we can construct the state store etc.

This would require a KIP. However as we are currently discussing DSL changes to remove overloads I'd recommend we hold until we know which direction we are going. 

> KTable/KTable join followed by groupBy and aggregate/count can result in incorrect results
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4609
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4609
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1, 0.10.2.0
>            Reporter: Damian Guy
>            Assignee: Damian Guy
>              Labels: architecture
>
> When caching is enabled, KTable/KTable joins can result in duplicate values being emitted. This will occur if there were updates to the same key in both tables. Each table is flushed independently, and each table will trigger the join, so you get two results for the same key. 
> If we subsequently perform a groupBy and then aggregate operation we will now process these duplicates resulting in incorrect aggregated values. For example count will be double the value it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)