You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Schrott <pe...@bluerootlabs.io> on 2022/08/03 13:55:20 UTC

FlinkKinesisConsumer: Dropping records on deserialiuation problems

Hi Flink Ppl!

Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.

I have the requirement to drop certain events before they enter the jobs pipeline. There are mainly 2 reasons:
1) Problems when it comes to deserializing the incoming bytes
2) Problems with the event itself, e.g. missing timestamp for event time based processing (actually also a problem of deserialization, just a matter of strictness) 

The FlinkKinesisConsumer is set up with an BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time processing.
 
Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 <https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment [1]: I hoped to find a solution in returning null in the implementation of org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.) 

Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer (but in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does not handle the returned null and fails with an NPE when trying to extract the timestamp [2].

I found out that the RecordEmitter later on actually filters for null events and drops them [3].

Is this a gap in the implementation or works as designed?

Does someone have an idea how to drop events before entering the pipeline?

I do have one workaround in mind: Do not add the timestamps/watermarks at source but at a later step in the stream, i.e. after cleaning out bad events. But this still does not overcome the problem with exceptions on event deserialization. This whereas could be solved by not deserializing events in the consumer, i.e. consuming bytes and add the deserialization as a flat map function as a first step in the pipeline. But all this does not sound good to me. 

Thanks & Best
Peter

[1] https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204 <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
[2] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999>
[3] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027>
[4] 


Re: FlinkKinesisConsumer: Dropping records on deserialiuation problems

Posted by Martijn Visser <ma...@apache.org>.
Hi Peter,

It was definitely not a bad idea!

Good luck and if you need more help, feel free to reach out to the mailing
list once more.

Best regards,

Martijn

Op wo 3 aug. 2022 om 16:24 schreef Peter Schrott <pe...@bluerootlabs.io>:

> Hi Martjin,
>
> Thanks a lot for that example. This looks a lot like my suggested
> workaround. Good to know that my idea was not too naive.
>
> All the best,
> Peter
>
> On 3. Aug 2022, at 16:16, Martijn Visser <ma...@apache.org> wrote:
>
> Hi Peter,
>
> You could consider the pattern that was used to create a Kafka Dead Letter
> Queue. There's a recipe including source code available for that at
> https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/
>
> Best regards,
>
> Martijn
>
>
> Op wo 3 aug. 2022 om 15:56 schreef Peter Schrott <pe...@bluerootlabs.io>:
>
>> Hi Flink Ppl!
>>
>> Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.
>>
>> I have the requirement to drop certain events before they enter the jobs
>> pipeline. There are mainly 2 reasons:
>> 1) Problems when it comes to deserializing the incoming bytes
>> 2) Problems with the event itself, e.g. missing timestamp for event time
>> based processing (actually also a problem of deserialization, just a
>> matter of strictness)
>>
>> The FlinkKinesisConsumer is set up with an
>> BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event
>> time processing.
>>
>> Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 and
>> the promising comment [1]: I hoped to find a solution in returning null in
>> the implementation of org.apache.flink.api.common.serialization.
>> DeserializationSchema.deserialize(.)
>>
>> Unfortunately, this is not (yet?) implemented for the
>> FlinkKinesisConsumer (but in FlinkKafkaConsumer) respectively the
>> ShardConsumer. The consumer does not handle the returned null and fails
>> with an NPE when trying to extract the timestamp [2].
>>
>> I found out that the RecordEmitter later on actually filters for null
>> events and drops them [3].
>>
>> Is this a gap in the implementation or works as designed?
>>
>> Does someone have an idea how to drop events before entering the pipeline?
>>
>> I do have one workaround in mind: Do not add the timestamps/watermarks at
>> source but at a later step in the stream, i.e. after cleaning out bad
>> events. But this still does not overcome the problem with exceptions on
>> event deserialization. This whereas could be solved by not deserializing
>> events in the consumer, i.e. consuming bytes and add the deserialization as
>> a flat map function as a first step in the pipeline. But all this does not
>> sound good to me.
>>
>> Thanks & Best
>> Peter
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204
>> <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
>> [2]
>> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999
>> [3]
>> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027
>> [4]
>>
>>
>

Re: FlinkKinesisConsumer: Dropping records on deserialiuation problems

Posted by Peter Schrott <pe...@bluerootlabs.io>.
Hi Martjin,

Thanks a lot for that example. This looks a lot like my suggested workaround. Good to know that my idea was not too naive.

All the best,
Peter

> On 3. Aug 2022, at 16:16, Martijn Visser <ma...@apache.org> wrote:
> 
> Hi Peter,
> 
> You could consider the pattern that was used to create a Kafka Dead Letter Queue. There's a recipe including source code available for that at https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/ <https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/>
> 
> Best regards,
> 
> Martijn
> 
> 
> Op wo 3 aug. 2022 om 15:56 schreef Peter Schrott <peter@bluerootlabs.io <ma...@bluerootlabs.io>>:
> Hi Flink Ppl!
> 
> Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.
> 
> I have the requirement to drop certain events before they enter the jobs pipeline. There are mainly 2 reasons:
> 1) Problems when it comes to deserializing the incoming bytes
> 2) Problems with the event itself, e.g. missing timestamp for event time based processing (actually also a problem of deserialization, just a matter of strictness) 
> 
> The FlinkKinesisConsumer is set up with an BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time processing.
>  
> Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 <https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment [1]: I hoped to find a solution in returning null in the implementation of org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.) 
> 
> Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer (but in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does not handle the returned null and fails with an NPE when trying to extract the timestamp [2].
> 
> I found out that the RecordEmitter later on actually filters for null events and drops them [3].
> 
> Is this a gap in the implementation or works as designed?
> 
> Does someone have an idea how to drop events before entering the pipeline?
> 
> I do have one workaround in mind: Do not add the timestamps/watermarks at source but at a later step in the stream, i.e. after cleaning out bad events. But this still does not overcome the problem with exceptions on event deserialization. This whereas could be solved by not deserializing events in the consumer, i.e. consuming bytes and add the deserialization as a flat map function as a first step in the pipeline. But all this does not sound good to me. 
> 
> Thanks & Best
> Peter
> 
> [1] https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204 <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
> [2] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999>
> [3] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027>
> [4] 
> 


Re: FlinkKinesisConsumer: Dropping records on deserialiuation problems

Posted by Martijn Visser <ma...@apache.org>.
Hi Peter,

You could consider the pattern that was used to create a Kafka Dead Letter
Queue. There's a recipe including source code available for that at
https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/

Best regards,

Martijn


Op wo 3 aug. 2022 om 15:56 schreef Peter Schrott <pe...@bluerootlabs.io>:

> Hi Flink Ppl!
>
> Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.
>
> I have the requirement to drop certain events before they enter the jobs
> pipeline. There are mainly 2 reasons:
> 1) Problems when it comes to deserializing the incoming bytes
> 2) Problems with the event itself, e.g. missing timestamp for event time
> based processing (actually also a problem of deserialization, just a
> matter of strictness)
>
> The FlinkKinesisConsumer is set up with an
> BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event
> time processing.
>
> Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 and
> the promising comment [1]: I hoped to find a solution in returning null in
> the implementation of org.apache.flink.api.common.serialization.
> DeserializationSchema.deserialize(.)
>
> Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer (but
> in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does
> not handle the returned null and fails with an NPE when trying to extract
> the timestamp [2].
>
> I found out that the RecordEmitter later on actually filters for null
> events and drops them [3].
>
> Is this a gap in the implementation or works as designed?
>
> Does someone have an idea how to drop events before entering the pipeline?
>
> I do have one workaround in mind: Do not add the timestamps/watermarks at
> source but at a later step in the stream, i.e. after cleaning out bad
> events. But this still does not overcome the problem with exceptions on
> event deserialization. This whereas could be solved by not deserializing
> events in the consumer, i.e. consuming bytes and add the deserialization as
> a flat map function as a first step in the pipeline. But all this does not
> sound good to me.
>
> Thanks & Best
> Peter
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204
> <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
> [2]
> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999
> [3]
> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027
> [4]
>
>