You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2023/04/24 19:44:00 UTC

[jira] [Updated] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

     [ https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-14862:
------------------------------------
    Affects Version/s: 3.1.0

> Outer stream-stream join does not output all results with multiple input partitions
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-14862
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14862
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.0
>            Reporter: Bruno Cadonna
>            Assignee: Matthias J. Sax
>            Priority: Major
>             Fix For: 3.5.0, 3.4.1
>
>
> If I execute the following Streams app once with two input topics each with 1 partition and then with input topics each with two partitions, I get different results.
>   
> {code:java}
> final KStream<String, String> leftSide = builder.stream(leftSideTopic);
> final KStream<String, String> rightSide = builder.stream(rightSideTopic);
> final KStream<String, String> leftAndRight = leftSide.outerJoin(
>     rightSide,
>     (leftValue, rightValue) ->
>         (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + rightValue,
>     JoinWindows.ofTimeDifferenceAndGrace(
>         Duration.ofSeconds(20), 
>         Duration.ofSeconds(10)),
>     StreamJoined.with(
>         Serdes.String(), /* key */
>         Serdes.String(), /* left value */
>         Serdes.String()  /* right value */
>     ));
>     leftAndRight.print(Printed.toSysOut());
> {code}
> To reproduce, produce twice the following batch of records with an interval greater than window + grace period (i.e. > 30 seconds) in between the two batches:
> {code}
> (0, 0)
> (1, 1)
> (2, 2)
> (3, 3)
> (4, 4)
> (5, 5)
> (6, 6)
> (7, 7)
> (8, 8)
> (9, 9)
> {code}
> With input topics with 1 partition I get:
> {code}
> [KSTREAM-PROCESSVALUES-0000000008]: 0, 0/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 2, 2/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 5, 5/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 6, 6/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
> {code}
> With input topics with 2 partitions I get:
> {code}
> [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
> {code}
> I would expect to get the same set of records, maybe in a different order due to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)