You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "A. Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2021/07/30 20:49:00 UTC
[jira] [Resolved] (KAFKA-10246) AbstractProcessorContext topic()
throws NullPointerException when modifying a state store within the DSL
from a punctuator
[ https://issues.apache.org/jira/browse/KAFKA-10246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman resolved KAFKA-10246.
--------------------------------------------
Resolution: Fixed
> AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10246
> URL: https://issues.apache.org/jira/browse/KAFKA-10246
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.5.0
> Environment: linux, windows, java 11
> Reporter: Peter Pringle
> Priority: Major
>
> NullPointerException seen when a KTable statestore is being modified by a punctuated method which is added to a topology via the DSL processor/ktable valueTransfomer methods.
> It seems valid for AbstractProcessorContext.topic() to return null; however the check below returns a NullPointerException before a null can be returned.
> {quote}if (topic.equals(NONEXIST_TOPIC)) {
> {quote}
> Made a local fix to reverse the ordering of the check (i.e. avoid the null) and this appears to fix the issue and sends the change to the state stores changelog topic.
> {quote}if (NONEXIST_TOPIC.equals(topic)) {
> {quote}
> Stacktrace below
> {{2020-07-02 07:29:46,829 [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}}
> {{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following error during processing:}}
> {{java.lang.NullPointerException: null}}
> \{{ at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
> \{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
> \{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
> \{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
> \{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
> \{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
> \{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
> \{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
> \{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}}
> \{{ at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}}
> \{{ at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}}
> \{{ at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}}
> \{{ at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
> \{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
> \{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
> \{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) // punctuated lambda - user code}}
> \{{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) //iterates over the state store and cleans up old items}}
> \{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}}
> \{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}}
> \{{ at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}}
> \{{ at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}}
> \{{ at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}}
> \{{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}}
> {
> { at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)}
> }
--
This message was sent by Atlassian Jira
(v8.3.4#803005)