You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/02/18 17:48:00 UTC

[jira] [Commented] (KAFKA-9566) ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler

    [ https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039283#comment-17039283 ] 

Matthias J. Sax commented on KAFKA-9566:
----------------------------------------

Thanks for reporting this issue. We should update the JavaDocs accordingly. The ides to pass in the context is to provide context information like topic name, partitions, offset, timestamp of the record etc.

We should also throw a better exception, like "UnsupportedOperationException".

> ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9566
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.2.0
>            Reporter: Tomas Mi
>            Priority: Major
>
> Hi, I am trying to implement custom DeserializationExceptionHandler which would forward an exception to downstream processor(s), but ProcessorContextImpl#forward throws a NullPointerException if invoked from this custom handler.
> Handler implementation:
> {code:title=MyDeserializationExceptionHandler.java}
> public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler {
>     @Override
>     public void configure(Map<String, ?> configs) {
>     }
>     @Override
>     public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
>         context.forward(null, exception, To.child("error-processor"));
>         return DeserializationHandlerResponse.CONTINUE;
>     }
> }
> {code}
> Handler is wired as default deserialization exception handler:
> {code}
>     private TopologyTestDriver initializeTestDriver(StreamsBuilder streamBuilder) {
>         Topology topology = streamBuilder.build();
>         Properties props = new Properties();
>         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-application");
>         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
>         props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
>         props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class.getName());
>         return new TopologyTestDriver(topology, props);
>     }
> {code}
>  
> Exception stacktrace:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: Fatal user code error in deserialization error callback
>     at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
>     at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
>     at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
>     at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
>     at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
>     at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
>     ...
> Caused by: java.lang.NullPointerException
>     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
>     at MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
>     at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) ... 33 more
> {noformat}
> Neither DeserializationExceptionHandler, nor ProcessorContext javadocs mention that ProcessorContext#forward(...) must not be invoked from DeserializationExceptionHandler, so I assume that this is a defect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)