You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2018/01/18 21:17:01 UTC

[jira] [Resolved] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

     [ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang resolved KAFKA-6398.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.0.1
                   1.1.0

> Non-aggregation KTable generation operator does not construct value getter correctly
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bug
>             Fix For: 1.1.0, 1.0.1
>
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has three code path:
> 1. If the operator is a KTable source operator, using its materialized state store for value getter (note that currently we always materialize on KTable source).
> 2. If the operator is an aggregation operator, then its generated KTable should always be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the value getter would just rely on its materialized state store to get the value; otherwise we just rely on the operator itself to define which parent's value getter to inherit and what computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}} where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from parent's value getter and then apply the filter on the fly; and in addition we should let the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the result KTable is materialized or not, but always try to use its parent's value getter, and 2) it does not try to connect its parent's materialized store to the future operator. As a result, these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in TopologyException when building. The following is an example:
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet.
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
> 	at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> ------------------------------------------------------------------------------------------------------------
> Note that there is another minor orthogonal issue of {{KTable#filter}} itself that it does not include its parent's queryable store name when itself is not materialized (see {{KTable#mapValues}} for reference).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)