You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/01/13 19:00:00 UTC

[jira] [Updated] (KAFKA-8377) KTable#transformValue might lead to incorrect result in joins

     [ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-8377:
-----------------------------------
    Description: 
Kafka Streams uses an optimization to not materialize every result KTable. If a non-materialized KTable is input to a join, the lookup into the table results in a lookup of the parents table plus a call to the operator. For example,
{code:java}
KTable nonMaterialized = materializedTable.filter(...);
KTable table2 = ...

table2.join(nonMaterialized,...){code}
If there is a table2 input record, the lookup to the other side is performed as a lookup into materializedTable plus applying the filter().

For stateless operation like filter, this is safe. However, #transformValues() might have an attached state store. Hence, when an input record r is processed by #transformValues() with current state S, it might produce an output record r' (that is not materialized). When the join later does a lookup to get r from the parent table, there is no guarantee that #transformValues() again produces r' because its state might not be the same any longer. A similar issue applies to stateless #transformValue() the accessed the `ProcessorContext` – when the `ProcessorContext` is accessed a second time (when processing the data from the upstream lookup, to recompute the store content) the `ProcessorContext` would return different data (ie, now the data of the currently processed record)

Hence, it seems to be required, to always materialize the result of a KTable#transformValues() operation if there is state or if `ProcessorContext` is used – one issue is, that we don't know upfront if `ProcessorContext` is used and thus might be conservative and always materialize the result (maybe be this some what to optimize operations like `filter` though). Note, that if there would be a consecutive filter() after tranformValue(), it would also be ok to materialize the filter() result. Furthermore, if there is no downstream join(), materialization is also not required.

Basically, it seems to be unsafe to apply `KTableValueGetter` on a #transformValues()` operator.

  was:
Kafka Streams uses an optimization to not materialize every result KTable. If a non-materialized KTable is input to a join, the lookup into the table results in a lookup of the parents table plus a call to the operator. For example,
{code:java}
KTable nonMaterialized = materializedTable.filter(...);
KTable table2 = ...

table2.join(nonMaterialized,...){code}
If there is a table2 input record, the lookup to the other side is performed as a lookup into materializedTable plus applying the filter().

For stateless operation like filter, this is safe. However, #transformValues() might have an attached state store. Hence, when an input record r is processed by #transformValues() with current state S, it might produce an output record r' (that is not materialized). When the join later does a lookup to get r from the parent table, there is no guarantee that #transformValues() again produces r' because its state might not be the same any longer.

Hence, it seems to be required, to always materialize the result of a KTable#transformValues() operation if there is state. Note, that if there would be a consecutive filter() after tranformValue(), it would also be ok to materialize the filter() result. Furthermore, if there is no downstream join(), materialization is also not required.

Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful #transformValues()` operator.


> KTable#transformValue might lead to incorrect result in joins
> -------------------------------------------------------------
>
>                 Key: KAFKA-8377
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8377
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Aishwarya Pradeep Kumar
>            Priority: Major
>              Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If a non-materialized KTable is input to a join, the lookup into the table results in a lookup of the parents table plus a call to the operator. For example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, #transformValues() might have an attached state store. Hence, when an input record r is processed by #transformValues() with current state S, it might produce an output record r' (that is not materialized). When the join later does a lookup to get r from the parent table, there is no guarantee that #transformValues() again produces r' because its state might not be the same any longer. A similar issue applies to stateless #transformValue() the accessed the `ProcessorContext` – when the `ProcessorContext` is accessed a second time (when processing the data from the upstream lookup, to recompute the store content) the `ProcessorContext` would return different data (ie, now the data of the currently processed record)
> Hence, it seems to be required, to always materialize the result of a KTable#transformValues() operation if there is state or if `ProcessorContext` is used – one issue is, that we don't know upfront if `ProcessorContext` is used and thus might be conservative and always materialize the result (maybe be this some what to optimize operations like `filter` though). Note, that if there would be a consecutive filter() after tranformValue(), it would also be ok to materialize the filter() result. Furthermore, if there is no downstream join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a #transformValues()` operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)