You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Bejeck (JIRA)" <ji...@apache.org> on 2017/02/23 18:56:44 UTC

[jira] [Comment Edited] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

    [ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881018#comment-15881018 ] 

Bill Bejeck edited comment on KAFKA-4791 at 2/23/17 6:56 PM:
-------------------------------------------------------------

Fair enough.  I did not look into the issue at all, at first blush it seemed like a bug that needed to be fixed asap.  But considering your comments and the forthcoming changes with KIP-120, I'll hold off.


was (Author: bbejeck):
Fair enough.  I did not look into the issue at all, at first blush it seemed like a big that needed to be fixed asap.  But considering your comments and the forthcoming changes with KIP-120, I'll hold off.

> Kafka Streams - unable to add state stores when using wildcard topics on the source
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-4791
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4791
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: Java 8
>            Reporter: Bart Vercammen
>            Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it seems not possible to attach state stores when using wildcard topics on the sources.
> Inside {{addStateStore}}, the processor gets connected to the state store with {{connectProcessorAndStateStore}}, and there it will try to connect the state store with the source topics from the processor: {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
>     private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
>         final Set<String> sourceTopics = new HashSet<>();
>         for (String parent : parents) {
>             NodeFactory nodeFactory = nodeFactories.get(parent);
>             if (nodeFactory instanceof SourceNodeFactory) {
>                 sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
>             } else if (nodeFactory instanceof ProcessorNodeFactory) {
>                 sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
>             }
>         }
>         return sourceTopics;
>     }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()))}} will fail as there are no topics inside the {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)