You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2022/11/07 16:58:00 UTC

[jira] [Created] (KAFKA-14364) Support evolving serde with Foreign Key Join

John Roesler created KAFKA-14364:
------------------------------------

             Summary: Support evolving serde with Foreign Key Join
                 Key: KAFKA-14364
                 URL: https://issues.apache.org/jira/browse/KAFKA-14364
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: John Roesler


The current implementation of Foreign-Key join uses a hash comparison to determine whether it should emit join results or not. See [https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110]

As specified in KIP-213 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable] ), we must do a comparison of this nature in order to get correct results when the foreign-key reference changes, as the old reference might emit delayed results after the new instance generates its updated results, leading to an incorrect final join state.

The hash comparison prevents this race condition by ensuring that any emitted results correspond to the _current_ version of the left-hand-side record (and therefore that the foreign-key reference itself has not changed).

An undesired side-effect of this is that if users update their serdes (in a compatible way), for example to add a new optional field to the record, then the resulting hash will change for existing records. This will cause Streams to stop emitting results for those records until a new left-hand-side update comes in, recording a new hash for those records.

It should be possible to provide a fix. Some ideas:
 * only consider the foreign-key references itself in the hash function (this was the original proposal, but we opted to hash the entire record as an optimization to suppress unnecessary updates).
 * provide a user-overridable hash function. This would be more flexible, but also pushes a lot of complexity onto users, and opens up the possibility to completely break semantics.

We will need to design the solution carefully so that we can preserve the desired correctness guarantee.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)