You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Haohui Mai (JIRA)" <ji...@apache.org> on 2017/02/06 23:23:41 UTC

[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

    [ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854944#comment-15854944 ] 

Haohui Mai commented on FLINK-5583:
-----------------------------------

In general (1) sounds good to me. Taking a closer look it seems that it might require a stateful API instead of the traditional {{Collector}} APIs.

We have a mission-critical use case that needs to write all corrupted messages to a persistent store so that these messages can be inspected and backfilled later. Ideally the {{DeserializationSchema}} could some state and probably will need to be synchronized when checkpoints happen.

It might be more natural to deserialize messages as a subsequent stage of the consumer. Thoughts?

[~rmetzger] [~tzulitai]

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and exceptions in the Kafka consumer in order to build a robust application in production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The streaming pipeline might want to bail out (which is the current behavior) or to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)