You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2017/02/07 07:18:41 UTC

[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

    [ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855466#comment-15855466 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:18 AM:
--------------------------------------------------------------------

Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method).

First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    // user uses collector to buffer outputs
    void deserialize(byte[] message, OutputCollector<T> collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one :-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32.

For the synchronization explained above, we do not need to expose another {{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method).

First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    // user uses collector to buffer outputs
    void deserialize(byte[] message, OutputCollector<T> collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one :-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32.

For the synchronization explained above, we do not need to expose another {{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and exceptions in the Kafka consumer in order to build a robust application in production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The streaming pipeline might want to bail out (which is the current behavior) or to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)