You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2018/05/22 22:26:00 UTC

[jira] [Issue Comment Deleted] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

     [ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-6378:
-----------------------------------
    Comment: was deleted

(was: wicknicks opened a new pull request #5065: KAFKA-6378: Implement error handling for source and sink tasks
URL: https://github.com/apache/kafka/pull/5065
 
 
   This PR implements the features described in this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
   
   This PR changes the Connect framework to allow it to automatically deal with errors encountered while processing records in a Connector. The following behavior changes are introduced here:
   
   **Retry on Failure**: Retry the failed operation a configurable number of times, with backoff between each retry.
   **Task Tolerance Limits**: Tolerate a configurable number of failures in a task.
   We also add the following ways to report errors, along with sufficient context to simplify the debugging process:
   
   **Log Error Context**: The error information along with processing context is logged along with standard application logs.
   **Dead Letter Queue**: Produce the original message into a Kafka topic (applicable only to sink connectors).
   The logged information consists of the following bits:
   
   New **metrics** which will monitor the number of failures, and the behavior of the response handler are added.
   
   The changes proposed here **are backward compatible**. The current behavior in Connect is to kill the task on the first error in any stage. This will remain the default behavior if the connector does not override any of the new configurations which are provided as part of this feature.
   
   Testing: added multiple unit tests to test the retry and tolerance logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
)

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6378
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andy Bryant
>            Assignee: Andy Bryant
>            Priority: Major
>             Fix For: 1.1.0, 2.0.0, 1.0.1
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the stream fails with a NullPointerException (see stacktrace below). On Kafka 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with the table value set to null.
> The use-case for this is joining a stream to a table containing reference data where the stream foreign key may be null. There is no straight-forward workaround in this case with Kafka 1.0.0 without having to resort to either generating a key that will never match or branching the stream for records that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" java.lang.NullPointerException
> 	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
> 	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> 	at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)