You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Bejeck (JIRA)" <ji...@apache.org> on 2017/02/26 21:58:45 UTC
[jira] [Work started] (KAFKA-4791) Kafka Streams - unable to add
state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on KAFKA-4791 started by Bill Bejeck.
------------------------------------------
> Kafka Streams - unable to add state stores when using wildcard topics on the source
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.1
> Environment: Java 8
> Reporter: Bart Vercammen
> Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following components :
> {code}
> new TopologyBuilder()
> .addSource("ingest", Pattern.compile( ... ))
> .addProcessor("myprocessor", ..., "ingest")
> .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it seems not possible to attach state stores when using wildcard topics on the sources.
> Inside {{addStateStore}}, the processor gets connected to the state store with {{connectProcessorAndStateStore}}, and there it will try to connect the state store with the source topics from the processor: {{connectStateStoreNameToSourceTopics}}
> Here lies the problem:
> {code}
> private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
> final Set<String> sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()))}} will fail as there are no topics inside the {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on wildcard topics, but no combination of both ...
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)