You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/09/24 15:44:00 UTC

[jira] [Commented] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

    [ https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201603#comment-17201603 ] 

John Roesler commented on KAFKA-10515:
--------------------------------------

Thanks for this report, [~thorsten.hake] !

I've just taken a look at the code and the stacktrace, and I see that your explanation is spot on.

I've gone ahead and marked the ticket as a blocker for the next release in each reported version, although the release managers may push back, since this is not a regression. Still, I think we should get it fixed asap.

Would you like to submit a patch for this? No pressure; I just wanted to check if you were planning on claiming this one.

Thanks,

-John

> NPE: Foreign key join serde may not be initialized with default serde if application is distributed
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10515
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: Thorsten Hake
>            Priority: Blocker
>             Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins serdes for KStream applications that do not run distributed over multiple instances.
> However, if an application runs distributed over multiple instances, the foreign key join serdes may still not be initialized leading to the following NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
> 	at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
> 	at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
> 	at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
> 	at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
> 	at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
> 	at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
> 	at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
> 	at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> 	at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
> 	at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
> 	at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> 	at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> 	at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> 	at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
> 	at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
> 	at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> 	at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
> 	at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
> 	at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
> 	at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
> 	at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> 	at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed across multiple tasks. The serde will only be initialized with the default serde during the initialization of the task containing the sink node ("subscription-registration-sink"). So if the task containing the SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to the same instance as the task containing the sink node, a NPE will be thrown because the Serde of the state store used within the SubscriptionStoreReceiveProcessor is not initialized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)