You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Peter Pringle (Jira)" <ji...@apache.org> on 2020/07/08 02:25:00 UTC

[jira] [Created] (KAFKA-10246) AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator

Peter Pringle created KAFKA-10246:
-------------------------------------

             Summary: AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator
                 Key: KAFKA-10246
                 URL: https://issues.apache.org/jira/browse/KAFKA-10246
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.0
         Environment: linux, windows, java 11
            Reporter: Peter Pringle


It seems valid for AbstractProcessorContext.topic() to return null; however the check below returns a NullPointerException before a null can be returned.
{quote}if (topic.equals(NONEXIST_TOPIC)) {
{quote}
Make a local fix to reverse the ordering of the check (i.e. avoid the null) and this appears to fix the issue and sends the change to the state stores change log topic.
{quote}if (NONEXIST_TOPIC.equals(topic)) {
{quote}
Stacktrace below seen when deleting from a previously declared ktable materialized state store which is being called from a punctuator added to the topology using either process/valueTransform within the init method.

 

 

{{2020-07-02 07:29:46,829 [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}}
{{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following error during processing:}}
{{java.lang.NullPointerException: null}}
{{ at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}}
{{ at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}}
{{ at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}}
{{ at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}}
{{ at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) // punctuated lambda - user code}}
{{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) //iterates over the state store and cleans up old items}}
{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}}
{{ at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}}
{{ at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}}
{{ at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)