You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2020/09/23 11:12:29 UTC

Ignoring invalid values in KafkaSerializationSchema

Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using
FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL
API.

In some cases, when trying to convert the Row object to a byte[],
serialization will fail due to malformed values. In such cases, I would
like the custom serialization schema to drop the bad records and not send
them through.

From the API, it is unclear how such failures should be handled. Given the
following signature:

 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
timestamp);

From reading the code, there's no exception handling or null checking,
which means that:

- If an exception is thrown, it will cause the entire job to fail (this has
happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send
which is undesirable.

What are the options here?



-- 
Best Regards,
Yuval Itzchakov.

Re: Ignoring invalid values in KafkaSerializationSchema

Posted by Alexey Trenikhun <ye...@msn.com>.
You could forward tombstone records to “graveyard” topic, this way they will not confuse anyone reading from regular topic


________________________________
From: Yuval Itzchakov <yu...@gmail.com>
Sent: Thursday, September 24, 2020 11:50:28 AM
To: Arvid Heise <ar...@ververica.com>
Cc: Matthias Pohl <ma...@ververica.com>; user <us...@flink.apache.org>
Subject: Re: Ignoring invalid values in KafkaSerializationSchema

Hi Arvid,

Thanks for the response:

The topic is not log compacted and these invalid values are not actually tombstones, I wouldn't want anyone to misinterpret them as such.

Regarding filtering the rows in a separate flatMap, that's a great idea. Only problem is that the rows are opaque from the perspective of the stream (literally could be any SQL Table), traversing them via the RowTypeInfo can end up being expensive.

Wrapping the KafkaProducer is the path I thought about. Ideally though, I think exception handling should be part of the SerializationSchema contract as things may fail eventually, and having a clear way to opt-out such values would be great.



On Thu, Sep 24, 2020, 18:44 Arvid Heise <ar...@ververica.com>> wrote:
Hi Yuval,

Here are some workarounds.

One option is to use a tombstone record (0 byte payload) and filter it downstream. If it's log-compacted, Kafka would filter them on compaction.

Second option is to actually translate the Row to a byte[] array in a separate flatMap (returning 0 records on error) and then simply write the byte[] directly to Kafka in the Schema.

Third option is to wrap the sink or KafkaProducer and catch the exception (possibly using a custom exception for clarity).

On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl <ma...@ververica.com>> wrote:
Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling currently implemented for SerializationSchema. FLIP-124 [1] addressed this for the DeserializationSchema, though. I created FLINK-19397 [2] to cover this feature.

In the meantime, I cannot think of any other solution than filtering those rows out in a step before emitting the data to Kafka.

Best,
Matthias

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yu...@gmail.com>> wrote:
Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL API.

In some cases, when trying to convert the Row object to a byte[], serialization will fail due to malformed values. In such cases, I would like the custom serialization schema to drop the bad records and not send them through.

From the API, it is unclear how such failures should be handled. Given the following signature:

 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

From reading the code, there's no exception handling or null checking, which means that:

- If an exception is thrown, it will cause the entire job to fail (this has happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send which is undesirable.

What are the options here?



--
Best Regards,
Yuval Itzchakov.



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Ignoring invalid values in KafkaSerializationSchema

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Arvid,

Thanks for the response:

The topic is not log compacted and these invalid values are not actually
tombstones, I wouldn't want anyone to misinterpret them as such.

Regarding filtering the rows in a separate flatMap, that's a great idea.
Only problem is that the rows are opaque from the perspective of the stream
(literally could be any SQL Table), traversing them via the RowTypeInfo can
end up being expensive.

Wrapping the KafkaProducer is the path I thought about. Ideally though, I
think exception handling should be part of the SerializationSchema contract
as things may fail eventually, and having a clear way to opt-out such
values would be great.



On Thu, Sep 24, 2020, 18:44 Arvid Heise <ar...@ververica.com> wrote:

> Hi Yuval,
>
> Here are some workarounds.
>
> One option is to use a tombstone record (0 byte payload) and filter it
> downstream. If it's log-compacted, Kafka would filter them on compaction.
>
> Second option is to actually translate the Row to a byte[] array in a
> separate flatMap (returning 0 records on error) and then simply write the
> byte[] directly to Kafka in the Schema.
>
> Third option is to wrap the sink or KafkaProducer and catch the exception
> (possibly using a custom exception for clarity).
>
> On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Yuval,
>> thanks for bringing this issue up. You're right: There is no error
>> handling currently implemented for SerializationSchema. FLIP-124 [1]
>> addressed this for the DeserializationSchema, though. I created
>> FLINK-19397 [2] to cover this feature.
>>
>> In the meantime, I cannot think of any other solution than filtering
>> those rows out in a step before emitting the data to Kafka.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
>> [2] https://issues.apache.org/jira/browse/FLINK-19397
>>
>> On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm using a custom KafkaSerializationSchema to write records to Kafka
>>> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
>>> SQL API.
>>>
>>> In some cases, when trying to convert the Row object to a byte[],
>>> serialization will fail due to malformed values. In such cases, I would
>>> like the custom serialization schema to drop the bad records and not send
>>> them through.
>>>
>>> From the API, it is unclear how such failures should be handled. Given
>>> the following signature:
>>>
>>>  ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
>>> timestamp);
>>>
>>> From reading the code, there's no exception handling or null checking,
>>> which means that:
>>>
>>> - If an exception is thrown, it will cause the entire job to fail (this
>>> has happened to me in production)
>>> - If null is passed, a null value will be pushed down to
>>> kafkaProducer.send which is undesirable.
>>>
>>> What are the options here?
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Ignoring invalid values in KafkaSerializationSchema

Posted by Arvid Heise <ar...@ververica.com>.
Hi Yuval,

Here are some workarounds.

One option is to use a tombstone record (0 byte payload) and filter it
downstream. If it's log-compacted, Kafka would filter them on compaction.

Second option is to actually translate the Row to a byte[] array in a
separate flatMap (returning 0 records on error) and then simply write the
byte[] directly to Kafka in the Schema.

Third option is to wrap the sink or KafkaProducer and catch the exception
(possibly using a custom exception for clarity).

On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Yuval,
> thanks for bringing this issue up. You're right: There is no error
> handling currently implemented for SerializationSchema. FLIP-124 [1]
> addressed this for the DeserializationSchema, though. I created
> FLINK-19397 [2] to cover this feature.
>
> In the meantime, I cannot think of any other solution than filtering those
> rows out in a step before emitting the data to Kafka.
>
> Best,
> Matthias
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
> [2] https://issues.apache.org/jira/browse/FLINK-19397
>
> On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using a custom KafkaSerializationSchema to write records to Kafka
>> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
>> SQL API.
>>
>> In some cases, when trying to convert the Row object to a byte[],
>> serialization will fail due to malformed values. In such cases, I would
>> like the custom serialization schema to drop the bad records and not send
>> them through.
>>
>> From the API, it is unclear how such failures should be handled. Given
>> the following signature:
>>
>>  ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
>> timestamp);
>>
>> From reading the code, there's no exception handling or null checking,
>> which means that:
>>
>> - If an exception is thrown, it will cause the entire job to fail (this
>> has happened to me in production)
>> - If null is passed, a null value will be pushed down to
>> kafkaProducer.send which is undesirable.
>>
>> What are the options here?
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Ignoring invalid values in KafkaSerializationSchema

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling
currently implemented for SerializationSchema. FLIP-124 [1] addressed this
for the DeserializationSchema, though. I created FLINK-19397 [2] to cover
this feature.

In the meantime, I cannot think of any other solution than filtering those
rows out in a step before emitting the data to Kafka.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi,
>
> I'm using a custom KafkaSerializationSchema to write records to Kafka
> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
> SQL API.
>
> In some cases, when trying to convert the Row object to a byte[],
> serialization will fail due to malformed values. In such cases, I would
> like the custom serialization schema to drop the bad records and not send
> them through.
>
> From the API, it is unclear how such failures should be handled. Given the
> following signature:
>
>  ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
> timestamp);
>
> From reading the code, there's no exception handling or null checking,
> which means that:
>
> - If an exception is thrown, it will cause the entire job to fail (this
> has happened to me in production)
> - If null is passed, a null value will be pushed down to
> kafkaProducer.send which is undesirable.
>
> What are the options here?
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>