You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/02/18 17:50:00 UTC

[jira] [Updated] (KAFKA-9566) ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler

     [ https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-9566:
-----------------------------------
    Affects Version/s:     (was: 2.2.0)
                       1.0.0

> ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9566
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Tomas Mi
>            Priority: Major
>
> Hi, I am trying to implement custom DeserializationExceptionHandler which would forward an exception to downstream processor(s), but ProcessorContextImpl#forward throws a NullPointerException if invoked from this custom handler.
> Handler implementation:
> {code:title=MyDeserializationExceptionHandler.java}
> public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler {
>     @Override
>     public void configure(Map<String, ?> configs) {
>     }
>     @Override
>     public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
>         context.forward(null, exception, To.child("error-processor"));
>         return DeserializationHandlerResponse.CONTINUE;
>     }
> }
> {code}
> Handler is wired as default deserialization exception handler:
> {code}
>     private TopologyTestDriver initializeTestDriver(StreamsBuilder streamBuilder) {
>         Topology topology = streamBuilder.build();
>         Properties props = new Properties();
>         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-application");
>         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
>         props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
>         props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class.getName());
>         return new TopologyTestDriver(topology, props);
>     }
> {code}
>  
> Exception stacktrace:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: Fatal user code error in deserialization error callback
>     at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
>     at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
>     at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
>     at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
>     at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
>     at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
>     ...
> Caused by: java.lang.NullPointerException
>     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
>     at MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
>     at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) ... 33 more
> {noformat}
> Neither DeserializationExceptionHandler, nor ProcessorContext javadocs mention that ProcessorContext#forward(...) must not be invoked from DeserializationExceptionHandler, so I assume that this is a defect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)