You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "GeordieMai (Jira)" <ji...@apache.org> on 2021/02/19 06:34:00 UTC

[jira] [Assigned] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

     [ https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

GeordieMai reassigned KAFKA-12336:
----------------------------------

    Assignee: GeordieMai

> custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter 
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12336
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Ramil Israfilov
>            Assignee: GeordieMai
>            Priority: Minor
>              Labels: easy-fix, newbie
>
> In our Scala application I am trying to implement custom naming for Kafka Streams application nodes.
> We are using topicPattern for our stream source.
> Here is an API which I am calling:
>  
> {code:java}
> val topicsPattern="t-[A-Za-z0-9-].suffix"
> val operations: KStream[MyKey, MyValue] =
>   builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
>     Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
>   )
> {code}
>  Despite the fact that I am providing Consumed with custom name the topology describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source.
> It is not a problem if I just use a name for topic. But our application needs to get messages from set of topics based on topicname pattern matching.
> After checking the kakfa code I see that
> org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 103) has a bug:
> {code:java}
> public <K, V> KStream<K, V> stream(final Pattern topicPattern,
>                                    final ConsumedInternal<K, V> consumed) {
>     final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
>     final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
> {code}
> node name construction does not take into account the name of consumed parameter.
> For example code for another stream api call with topic name does it correctly:
> {code:java}
> final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)