You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2023/04/24 19:44:00 UTC
[jira] [Updated] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions
[ https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-14862:
------------------------------------
Affects Version/s: 3.1.0
> Outer stream-stream join does not output all results with multiple input partitions
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.1.0
> Reporter: Bruno Cadonna
> Assignee: Matthias J. Sax
> Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> If I execute the following Streams app once with two input topics each with 1 partition and then with input topics each with two partitions, I get different results.
>
> {code:java}
> final KStream<String, String> leftSide = builder.stream(leftSideTopic);
> final KStream<String, String> rightSide = builder.stream(rightSideTopic);
> final KStream<String, String> leftAndRight = leftSide.outerJoin(
> rightSide,
> (leftValue, rightValue) ->
> (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + rightValue,
> JoinWindows.ofTimeDifferenceAndGrace(
> Duration.ofSeconds(20),
> Duration.ofSeconds(10)),
> StreamJoined.with(
> Serdes.String(), /* key */
> Serdes.String(), /* left value */
> Serdes.String() /* right value */
> ));
> leftAndRight.print(Printed.toSysOut());
> {code}
> To reproduce, produce twice the following batch of records with an interval greater than window + grace period (i.e. > 30 seconds) in between the two batches:
> {code}
> (0, 0)
> (1, 1)
> (2, 2)
> (3, 3)
> (4, 4)
> (5, 5)
> (6, 6)
> (7, 7)
> (8, 8)
> (9, 9)
> {code}
> With input topics with 1 partition I get:
> {code}
> [KSTREAM-PROCESSVALUES-0000000008]: 0, 0/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 2, 2/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 5, 5/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 6, 6/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
> {code}
> With input topics with 2 partitions I get:
> {code}
> [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
> {code}
> I would expect to get the same set of records, maybe in a different order due to the partitioning.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)