You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Victoria Xia (Jira)" <ji...@apache.org> on 2021/09/14 16:59:00 UTC
[jira] [Assigned] (KAFKA-13261) KTable to KTable foreign key join
loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victoria Xia reassigned KAFKA-13261:
------------------------------------
Assignee: Victoria Xia
> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.8.0, 2.7.1
> Reporter: Tomas Forsman
> Assignee: Victoria Xia
> Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B.
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem like it has something to do with the foreign key join in combination with several partitions.
> One suspicion would be that it is not possible to define what partitioner to use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable<String, String> tableB = builder.table("B", stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
> Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
> return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
> Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod");
> return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String name) {
> Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
> return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)