You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Victoria Xia (Jira)" <ji...@apache.org> on 2021/09/14 16:59:00 UTC

[jira] [Assigned] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

     [ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Victoria Xia reassigned KAFKA-13261:
------------------------------------

    Assignee: Victoria Xia

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Assignee: Victoria Xia
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem like it has something to do with the foreign key join in combination with several partitions. 
> One suspicion would be that it is not possible to define what partitioner to use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
>   return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod");
>     return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
>     return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)