You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ramil Israfilov (Jira)" <ji...@apache.org> on 2021/02/18 13:38:00 UTC

[jira] [Created] (KAFKA-12336) custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

Ramil Israfilov created KAFKA-12336:
---------------------------------------

             Summary: custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter 
                 Key: KAFKA-12336
                 URL: https://issues.apache.org/jira/browse/KAFKA-12336
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.7.0
            Reporter: Ramil Israfilov


In our Scala application I am trying to implement custom naming for Kafka Streams application nodes.

We are using topicPattern for our stream source.

Here is an API which I am calling:

 
{code:java}
val topicsPattern="t-[A-Za-z0-9-].suffix"
val operations: KStream[MyKey, MyValue] =
  builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
    Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
  )
{code}
 Despite the fact that I am providing Consumed with custom name the topology describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source.

It is not a problem if I just use a name for topic. But our application needs to get messages from set of topics based on topicname pattern matching.

After checking the kakfa code I see that

org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 103) has a bug:
{code:java}
public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                   final ConsumedInternal<K, V> consumed) {
    final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
    final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
{code}
node name construction does not take into account the name of consumed parameter.

For example code for another stream api call with topic name does it correctly:
{code:java}
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)