You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2018/01/03 20:11:00 UTC

[jira] [Assigned] (KAFKA-6398) Stream-Table join fails, if table is not materialized

     [ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang reassigned KAFKA-6398:
------------------------------------

    Assignee: Guozhang Wang

> Stream-Table join fails, if table is not materialized
> -----------------------------------------------------
>
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet.
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
> 	at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)