You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2019/06/18 09:06:22 UTC

[DISCUSS] Connectors and NULL handling

Hi All,

Thanks to Gary, I recently came upon an interesting cluster of issues:
 - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer to skip corrupted messages
 - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible error handling in the Kafka consumer
 - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema handle message record which value is null

In light of the last one I’d like to look again at the first two. What they introduced is that when the deserialisation schema returns NULL, the Kafka consumer (and maybe also the Kinesis consumer) silently drops the record. In Kafka NULL values have semantic meaning, i.e. they usually encode a DELETE for the key of the message. If SimpleStringSchema returned that null, our consumer would silently drop it and we would lose that DELETE message. That doesn’t seem right.

I think the right solution for null handling is to introduce a custom record type that encodes both Kafka NULL values and the possibility of a corrupt message that cannot be deserialised. Something like an Either type. It’s then up to the application to handle those cases. 

Concretely, I want to discuss whether we should change our consumers to not silently drop null records, but instead see them as errors. For FLINK-11820, the solution is for users to write their own custom schema that handles null values and returns a user-defined types that signals null values.

What do you think?

Aljoscha


Re: [DISCUSS] Connectors and NULL handling

Posted by Aljoscha Krettek <al...@apache.org>.
I think actually most of Flink was not designed to handle NULL values and as far as I remember, some people think that Flink shouldn’t handle NULL values. The fact that some parts support NULL values is more by accident than by conscious planning.

Aljoscha

> On 24. Jun 2019, at 10:07, Becket Qin <be...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for raising the issue. It seems there are two issues here: 1) The
> null value handling, and 2) The error handling.
> 
> For null value handling, my understanding is the following:
>  - Null values could have a realistic meaning in some systems. So Flink
> needs to support them.
>  - By design, in Flink, the records passed between Flink operators have
> already supported null values. They are wrapped in StreamRecord.
>  - Some user facing APIs, however, seem not fully support null values.
> e.g. the Collector.
>  - The connector code are sort of "user code" from Flink's perspective. So
> each connector should decide how null value should be treated.
> If we want to support null values in Flink everywhere, we may need to look
> into those user facing APIs that do not take null values. Wrapping the user
> returned value looks reasonable, ideally the wrapper class should also be
> StreamRecord so it is consistent with what we have for those records passed
> between operators.
> 
> WRT error handling, I agree with Xiaowei that the error handling mechanism
> should be something generic to the entire project instead of just for
> connectors. This reminds of another discussion thread which proposes to add
> a pluggable to categorize and report exceptions causing job failure [1]. It
> might worth thinking to see whether it makes sense to design the error
> handling and reporting as a whole.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> [1]
> https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing
> 
> 
> 
> 
> On Sun, Jun 23, 2019 at 9:40 PM Xiaowei Jiang <xi...@gmail.com> wrote:
> 
>> Error handling policy for streaming jobs goes beyond potential corrupted
>> messages in the source. Users may have subtle bugs while processing some
>> messages which may cause the streaming jobs to fail. Even though this can
>> be considered as a bug in user's code, users may prefer skip such messages
>> (or log them) and let the job continue in some cases. This may be an
>> opportunity to take such cases into consideration as well.
>> 
>> Xiaowei
>> 
>> On Fri, Jun 21, 2019 at 11:43 PM Rong Rong <wa...@gmail.com> wrote:
>> 
>>> Hi Aljoscha,
>>> 
>>> Sorry for the late reply, I think the solution makes sense. Using the NULL
>>> return value to mark a message is corrupted is not a valid way since NULL
>>> value has semantic meaning in not just Kafka but also in a lot of other
>>> contexts.
>>> 
>>> I was wondering if we can have a more meaningful interface for dealing
>>> with
>>> corrupted messages. I am thinking of 2 options on top of my head:
>>> 1. Create some special deserializer attribute (or a special record) to
>>> indicate corrupted messages like you suggested; this way we can not only
>>> encode the deserializing error but allow users to encode any corruption
>>> information for downstream processing.
>>> 2. Create a standard fetch error handling API on AbstractFetcher (for
>>> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
>>> other than deserializing problem, for example some even lower level
>>> exceptions like CRC check failure.
>>> 
>>> I think either way will work. Also, as long as there's a way for end users
>>> to extend the error handling for message corruption, it will not
>>> reintroduce the problems these 2 original JIRA was trying to address.
>>> 
>>> --
>>> Rong
>>> 
>>> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>> 
>>>> Hi All,
>>>> 
>>>> Thanks to Gary, I recently came upon an interesting cluster of issues:
>>>> - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka
>>> consumer
>>>> to skip corrupted messages
>>>> - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
>>>> error handling in the Kafka consumer
>>>> - https://issues.apache.org/jira/browse/FLINK-11820:
>>> SimpleStringSchema
>>>> handle message record which value is null
>>>> 
>>>> In light of the last one I’d like to look again at the first two. What
>>>> they introduced is that when the deserialisation schema returns NULL,
>>> the
>>>> Kafka consumer (and maybe also the Kinesis consumer) silently drops the
>>>> record. In Kafka NULL values have semantic meaning, i.e. they usually
>>>> encode a DELETE for the key of the message. If SimpleStringSchema
>>> returned
>>>> that null, our consumer would silently drop it and we would lose that
>>>> DELETE message. That doesn’t seem right.
>>>> 
>>>> I think the right solution for null handling is to introduce a custom
>>>> record type that encodes both Kafka NULL values and the possibility of a
>>>> corrupt message that cannot be deserialised. Something like an Either
>>> type.
>>>> It’s then up to the application to handle those cases.
>>>> 
>>>> Concretely, I want to discuss whether we should change our consumers to
>>>> not silently drop null records, but instead see them as errors. For
>>>> FLINK-11820, the solution is for users to write their own custom schema
>>>> that handles null values and returns a user-defined types that signals
>>> null
>>>> values.
>>>> 
>>>> What do you think?
>>>> 
>>>> Aljoscha
>>>> 
>>>> 
>>> 
>> 


Re: [DISCUSS] Connectors and NULL handling

Posted by Becket Qin <be...@gmail.com>.
Hi Aljoscha,

Thanks for raising the issue. It seems there are two issues here: 1) The
null value handling, and 2) The error handling.

For null value handling, my understanding is the following:
  - Null values could have a realistic meaning in some systems. So Flink
needs to support them.
  - By design, in Flink, the records passed between Flink operators have
already supported null values. They are wrapped in StreamRecord.
  - Some user facing APIs, however, seem not fully support null values.
e.g. the Collector.
  - The connector code are sort of "user code" from Flink's perspective. So
each connector should decide how null value should be treated.
If we want to support null values in Flink everywhere, we may need to look
into those user facing APIs that do not take null values. Wrapping the user
returned value looks reasonable, ideally the wrapper class should also be
StreamRecord so it is consistent with what we have for those records passed
between operators.

WRT error handling, I agree with Xiaowei that the error handling mechanism
should be something generic to the entire project instead of just for
connectors. This reminds of another discussion thread which proposes to add
a pluggable to categorize and report exceptions causing job failure [1]. It
might worth thinking to see whether it makes sense to design the error
handling and reporting as a whole.

Thanks,

Jiangjie (Becket) Qin

[1]
https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing




On Sun, Jun 23, 2019 at 9:40 PM Xiaowei Jiang <xi...@gmail.com> wrote:

> Error handling policy for streaming jobs goes beyond potential corrupted
> messages in the source. Users may have subtle bugs while processing some
> messages which may cause the streaming jobs to fail. Even though this can
> be considered as a bug in user's code, users may prefer skip such messages
> (or log them) and let the job continue in some cases. This may be an
> opportunity to take such cases into consideration as well.
>
> Xiaowei
>
> On Fri, Jun 21, 2019 at 11:43 PM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> Sorry for the late reply, I think the solution makes sense. Using the NULL
>> return value to mark a message is corrupted is not a valid way since NULL
>> value has semantic meaning in not just Kafka but also in a lot of other
>> contexts.
>>
>> I was wondering if we can have a more meaningful interface for dealing
>> with
>> corrupted messages. I am thinking of 2 options on top of my head:
>> 1. Create some special deserializer attribute (or a special record) to
>> indicate corrupted messages like you suggested; this way we can not only
>> encode the deserializing error but allow users to encode any corruption
>> information for downstream processing.
>> 2. Create a standard fetch error handling API on AbstractFetcher (for
>> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
>> other than deserializing problem, for example some even lower level
>> exceptions like CRC check failure.
>>
>> I think either way will work. Also, as long as there's a way for end users
>> to extend the error handling for message corruption, it will not
>> reintroduce the problems these 2 original JIRA was trying to address.
>>
>> --
>> Rong
>>
>> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> > Hi All,
>> >
>> > Thanks to Gary, I recently came upon an interesting cluster of issues:
>> >  - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka
>> consumer
>> > to skip corrupted messages
>> >  - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
>> > error handling in the Kafka consumer
>> >  - https://issues.apache.org/jira/browse/FLINK-11820:
>> SimpleStringSchema
>> > handle message record which value is null
>> >
>> > In light of the last one I’d like to look again at the first two. What
>> > they introduced is that when the deserialisation schema returns NULL,
>> the
>> > Kafka consumer (and maybe also the Kinesis consumer) silently drops the
>> > record. In Kafka NULL values have semantic meaning, i.e. they usually
>> > encode a DELETE for the key of the message. If SimpleStringSchema
>> returned
>> > that null, our consumer would silently drop it and we would lose that
>> > DELETE message. That doesn’t seem right.
>> >
>> > I think the right solution for null handling is to introduce a custom
>> > record type that encodes both Kafka NULL values and the possibility of a
>> > corrupt message that cannot be deserialised. Something like an Either
>> type.
>> > It’s then up to the application to handle those cases.
>> >
>> > Concretely, I want to discuss whether we should change our consumers to
>> > not silently drop null records, but instead see them as errors. For
>> > FLINK-11820, the solution is for users to write their own custom schema
>> > that handles null values and returns a user-defined types that signals
>> null
>> > values.
>> >
>> > What do you think?
>> >
>> > Aljoscha
>> >
>> >
>>
>

Re: [DISCUSS] Connectors and NULL handling

Posted by Xiaowei Jiang <xi...@gmail.com>.
Error handling policy for streaming jobs goes beyond potential corrupted
messages in the source. Users may have subtle bugs while processing some
messages which may cause the streaming jobs to fail. Even though this can
be considered as a bug in user's code, users may prefer skip such messages
(or log them) and let the job continue in some cases. This may be an
opportunity to take such cases into consideration as well.

Xiaowei

On Fri, Jun 21, 2019 at 11:43 PM Rong Rong <wa...@gmail.com> wrote:

> Hi Aljoscha,
>
> Sorry for the late reply, I think the solution makes sense. Using the NULL
> return value to mark a message is corrupted is not a valid way since NULL
> value has semantic meaning in not just Kafka but also in a lot of other
> contexts.
>
> I was wondering if we can have a more meaningful interface for dealing with
> corrupted messages. I am thinking of 2 options on top of my head:
> 1. Create some special deserializer attribute (or a special record) to
> indicate corrupted messages like you suggested; this way we can not only
> encode the deserializing error but allow users to encode any corruption
> information for downstream processing.
> 2. Create a standard fetch error handling API on AbstractFetcher (for
> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
> other than deserializing problem, for example some even lower level
> exceptions like CRC check failure.
>
> I think either way will work. Also, as long as there's a way for end users
> to extend the error handling for message corruption, it will not
> reintroduce the problems these 2 original JIRA was trying to address.
>
> --
> Rong
>
> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi All,
> >
> > Thanks to Gary, I recently came upon an interesting cluster of issues:
> >  - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka
> consumer
> > to skip corrupted messages
> >  - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
> > error handling in the Kafka consumer
> >  - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema
> > handle message record which value is null
> >
> > In light of the last one I’d like to look again at the first two. What
> > they introduced is that when the deserialisation schema returns NULL, the
> > Kafka consumer (and maybe also the Kinesis consumer) silently drops the
> > record. In Kafka NULL values have semantic meaning, i.e. they usually
> > encode a DELETE for the key of the message. If SimpleStringSchema
> returned
> > that null, our consumer would silently drop it and we would lose that
> > DELETE message. That doesn’t seem right.
> >
> > I think the right solution for null handling is to introduce a custom
> > record type that encodes both Kafka NULL values and the possibility of a
> > corrupt message that cannot be deserialised. Something like an Either
> type.
> > It’s then up to the application to handle those cases.
> >
> > Concretely, I want to discuss whether we should change our consumers to
> > not silently drop null records, but instead see them as errors. For
> > FLINK-11820, the solution is for users to write their own custom schema
> > that handles null values and returns a user-defined types that signals
> null
> > values.
> >
> > What do you think?
> >
> > Aljoscha
> >
> >
>

Re: [DISCUSS] Connectors and NULL handling

Posted by Rong Rong <wa...@gmail.com>.
Hi Aljoscha,

Sorry for the late reply, I think the solution makes sense. Using the NULL
return value to mark a message is corrupted is not a valid way since NULL
value has semantic meaning in not just Kafka but also in a lot of other
contexts.

I was wondering if we can have a more meaningful interface for dealing with
corrupted messages. I am thinking of 2 options on top of my head:
1. Create some special deserializer attribute (or a special record) to
indicate corrupted messages like you suggested; this way we can not only
encode the deserializing error but allow users to encode any corruption
information for downstream processing.
2. Create a standard fetch error handling API on AbstractFetcher (for
Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
other than deserializing problem, for example some even lower level
exceptions like CRC check failure.

I think either way will work. Also, as long as there's a way for end users
to extend the error handling for message corruption, it will not
reintroduce the problems these 2 original JIRA was trying to address.

--
Rong

On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi All,
>
> Thanks to Gary, I recently came upon an interesting cluster of issues:
>  - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer
> to skip corrupted messages
>  - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
> error handling in the Kafka consumer
>  - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema
> handle message record which value is null
>
> In light of the last one I’d like to look again at the first two. What
> they introduced is that when the deserialisation schema returns NULL, the
> Kafka consumer (and maybe also the Kinesis consumer) silently drops the
> record. In Kafka NULL values have semantic meaning, i.e. they usually
> encode a DELETE for the key of the message. If SimpleStringSchema returned
> that null, our consumer would silently drop it and we would lose that
> DELETE message. That doesn’t seem right.
>
> I think the right solution for null handling is to introduce a custom
> record type that encodes both Kafka NULL values and the possibility of a
> corrupt message that cannot be deserialised. Something like an Either type.
> It’s then up to the application to handle those cases.
>
> Concretely, I want to discuss whether we should change our consumers to
> not silently drop null records, but instead see them as errors. For
> FLINK-11820, the solution is for users to write their own custom schema
> that handles null values and returns a user-defined types that signals null
> values.
>
> What do you think?
>
> Aljoscha
>
>