You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Will Bartlett (Jira)" <ji...@apache.org> on 2021/06/15 08:55:00 UTC

[jira] [Commented] (KAFKA-12914) StreamSourceNode.toString() throws with StreamsBuilder.stream(Pattern) ctor

    [ https://issues.apache.org/jira/browse/KAFKA-12914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363489#comment-17363489 ] 

Will Bartlett commented on KAFKA-12914:
---------------------------------------

Thanks for the quick turnaround!

> StreamSourceNode.toString() throws with StreamsBuilder.stream(Pattern) ctor
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-12914
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12914
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Will Bartlett
>            Assignee: Matthias J. Sax
>            Priority: Minor
>             Fix For: 3.0.0, 2.8.1
>
>
> Hi, 
> I came across what looks like a bug.
> h2. Repro
> {code:java}
> import org.apache.kafka.common.serialization.Serdes
> import org.apache.kafka.streams.KafkaStreams
> import org.apache.kafka.streams.StreamsBuilder
> import org.apache.kafka.streams.kstream.Consumed
> import java.util.*
> import java.util.regex.Pattern
> fun main() {
>     val builder = StreamsBuilder()
>     builder.stream(Pattern.compile("foo"), Consumed.with(Serdes.Long(), Serdes.Long()))
>     val streams = KafkaStreams(builder.build(), Properties())
>     streams.start()
> }
> {code}
> {code:bash}
> SLF4J: Failed toString() invocation on an object of type [java.util.LinkedHashSet]
> Reported exception:
> java.lang.NullPointerException
> 	at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030)
> 	at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132)
> 	at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122)
> 	at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55)
> 	at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65)
> 	at java.base/java.lang.String.valueOf(String.java:3352)
> 	at java.base/java.lang.StringBuilder.append(StringBuilder.java:166)
> 	at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
> 	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
> 	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
> 	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
> 	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
> 	at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
> 	at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
> 	at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
> 	at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
> 	at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
> 	at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
> 	at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
> 	at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
> 	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
> 	at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414)
> 	at ch.qos.logback.classic.Logger.debug(Logger.java:490)
> 	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305)
> 	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624)
> 	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613)
> 	at ApplicationKt.main(Application.kt:11)
> 	at ApplicationKt.main(Application.kt)
> SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode]
> Reported exception:
> java.lang.NullPointerException
> 	at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030)
> 	at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132)
> 	at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122)
> 	at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55)
> 	at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65)
> 	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
> 	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
> 	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
> 	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
> 	at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
> 	at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
> 	at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
> 	at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
> 	at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
> 	at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
> 	at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
> 	at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
> 	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
> 	at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414)
> 	at ch.qos.logback.classic.Logger.debug(Logger.java:490)
> 	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305)
> 	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624)
> 	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613)
> 	at ApplicationKt.main(Application.kt:11)
> 	at ApplicationKt.main(Application.kt)
> {code}
> Kotlin but you get the idea.
> h2. Brief debug
> StreamSourceNode.toString() calls topicNames():
> [https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java#L64-L70]
> Which will be null if constructed with a Pattern rather than a String:
> [https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java#L49]
> Which causes UnmodifiableCollection to throw.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)