You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/05/16 20:31:00 UTC

[jira] [Created] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

Matthias J. Sax created KAFKA-8377:
--------------------------------------

             Summary: KTable#transformValue might lead to incorrect result in joins
                 Key: KAFKA-8377
                 URL: https://issues.apache.org/jira/browse/KAFKA-8377
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.0.0
            Reporter: Matthias J. Sax


Kafka Streams uses an optimization to not materialize every result KTable. If a non-materialized KTable is input to a join, the lookup into the table results in a lookup of the parents table plus a call to the operator. For example,
{code:java}
KTable nonMaterialized = materializedTable.filter(...);
KTable table2 = ...

table2.join(nonMaterialized,...){code}
If there is a table2 input record, the lookup to the other side is performed as a lookup into materializedTable plus applying the filter().

For stateless operation like filter, this is safe. However, #transformValues() might have an attached state store. Hence, when an input record r is processed by #transformValues() with current state S, it might produce an output record r' (that is not materialized). When the join later does a lookup to get r from the parent table, there is no guarantee that #transformValues() again produces r' because its state might not be the same any longer.

Hence, it seems to be required, to always materialize the result of a KTable#transformValues() operation if there is state. Note, that if there would be a consecutive filter() after tranformValue(), it would also be ok to materialize the filter() result. Furthermore, if there is no downstream join(), materialization is also not required.

Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful #transformValues()` operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)