You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hatem Mostafa <me...@hatem.co> on 2023/02/16 13:32:44 UTC

KafkaSink handling message size produce errors

Hello,

I am writing a flink job that reads and writes into kafka, it is using a
window operator and eventually writing the result of the window into a
kafka topic. The accumulated data can exceed the maximum message size after
compression on the producer level. I want to be able to catch the exception
coming from the producer and ignore this window. I could not find a way to
do that in KafkaSink
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
is there a way to do so?

I attached here an example of an error that I would like to handle
gracefully.

[image: image.png]


This question is similar to one that was asked on stackoverflow here
<https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink>
but
the answer is relevant for older versions of flink.

Regards,
Hatem

Re: KafkaSink handling message size produce errors

Posted by Jing Ge via user <us...@flink.apache.org>.
ticket created: https://issues.apache.org/jira/browse/FLINK-31121

On Fri, Feb 17, 2023 at 9:59 AM Hatem Mostafa <me...@hatem.co> wrote:

> Thanks for adding this to your backlog, I think it's definitely a very
> useful feature.
>
> Can you provide an example for how to extend KafkaSink to
> add this error handling? I have tried to do so but did not find it straight
> forward, since errors are thrown in the deliveryCallback of KafkaWriter and
> KafkaSink is not extendable since all its members are private and the
> constructor is package private.
>
> On Fri, Feb 17, 2023 at 8:17 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi jing,
>>
>> It sounds good to me, we can add an option for it
>>
>> Best,
>> Shammon
>>
>>
>> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge <ji...@ververica.com> wrote:
>>
>>> Hi,
>>>
>>> It makes sense to offer this feature of catching and ignoring exp with
>>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>>> ticket if most of you consider it as a good feature to help users.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY <zj...@gmail.com> wrote:
>>>
>>>> Hi Hatem
>>>>
>>>> As mentioned above, you can extend the KafkaSink or create a udf and
>>>> process the record before sink
>>>>
>>>> Best,
>>>> Shammon
>>>>
>>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia <lu...@alumni.sjtu.edu.cn>
>>>> wrote:
>>>>
>>>>> Hi, Hatem.
>>>>> I think there is no way to catch the exception and then ignore it in
>>>>> current implementation for KafkaSink.  You may also need to extend the
>>>>> KafkaSink.
>>>>>
>>>>> Best regards,
>>>>> Yuxia
>>>>>
>>>>> ------------------------------
>>>>> *发件人: *"Hatem Mostafa" <me...@hatem.co>
>>>>> *收件人: *"User" <us...@flink.apache.org>
>>>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>>>> *主题: *KafkaSink handling message size produce errors
>>>>>
>>>>> Hello,
>>>>> I am writing a flink job that reads and writes into kafka, it is using
>>>>> a window operator and eventually writing the result of the window into a
>>>>> kafka topic. The accumulated data can exceed the maximum message size after
>>>>> compression on the producer level. I want to be able to catch the exception
>>>>> coming from the producer and ignore this window. I could not find a way to
>>>>> do that in KafkaSink
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>>>> is there a way to do so?
>>>>>
>>>>> I attached here an example of an error that I would like to handle
>>>>> gracefully.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>>
>>>>> This question is similar to one that was asked on stackoverflow here
>>>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> but
>>>>> the answer is relevant for older versions of flink.
>>>>>
>>>>> Regards,
>>>>> Hatem
>>>>>
>>>>>

Re: KafkaSink handling message size produce errors

Posted by Hatem Mostafa <me...@hatem.co>.
Thanks for adding this to your backlog, I think it's definitely a very
useful feature.

Can you provide an example for how to extend KafkaSink to
add this error handling? I have tried to do so but did not find it straight
forward, since errors are thrown in the deliveryCallback of KafkaWriter and
KafkaSink is not extendable since all its members are private and the
constructor is package private.

On Fri, Feb 17, 2023 at 8:17 AM Shammon FY <zj...@gmail.com> wrote:

> Hi jing,
>
> It sounds good to me, we can add an option for it
>
> Best,
> Shammon
>
>
> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge <ji...@ververica.com> wrote:
>
>> Hi,
>>
>> It makes sense to offer this feature of catching and ignoring exp with
>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>> ticket if most of you consider it as a good feature to help users.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY <zj...@gmail.com> wrote:
>>
>>> Hi Hatem
>>>
>>> As mentioned above, you can extend the KafkaSink or create a udf and
>>> process the record before sink
>>>
>>> Best,
>>> Shammon
>>>
>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia <lu...@alumni.sjtu.edu.cn>
>>> wrote:
>>>
>>>> Hi, Hatem.
>>>> I think there is no way to catch the exception and then ignore it in
>>>> current implementation for KafkaSink.  You may also need to extend the
>>>> KafkaSink.
>>>>
>>>> Best regards,
>>>> Yuxia
>>>>
>>>> ------------------------------
>>>> *发件人: *"Hatem Mostafa" <me...@hatem.co>
>>>> *收件人: *"User" <us...@flink.apache.org>
>>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>>> *主题: *KafkaSink handling message size produce errors
>>>>
>>>> Hello,
>>>> I am writing a flink job that reads and writes into kafka, it is using
>>>> a window operator and eventually writing the result of the window into a
>>>> kafka topic. The accumulated data can exceed the maximum message size after
>>>> compression on the producer level. I want to be able to catch the exception
>>>> coming from the producer and ignore this window. I could not find a way to
>>>> do that in KafkaSink
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>>> is there a way to do so?
>>>>
>>>> I attached here an example of an error that I would like to handle
>>>> gracefully.
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> This question is similar to one that was asked on stackoverflow here
>>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> but
>>>> the answer is relevant for older versions of flink.
>>>>
>>>> Regards,
>>>> Hatem
>>>>
>>>>

Re: KafkaSink handling message size produce errors

Posted by Shammon FY <zj...@gmail.com>.
Hi jing,

It sounds good to me, we can add an option for it

Best,
Shammon


On Fri, Feb 17, 2023 at 3:13 PM Jing Ge <ji...@ververica.com> wrote:

> Hi,
>
> It makes sense to offer this feature of catching and ignoring exp with
> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
> ticket if most of you consider it as a good feature to help users.
>
> Best regards,
> Jing
>
> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi Hatem
>>
>> As mentioned above, you can extend the KafkaSink or create a udf and
>> process the record before sink
>>
>> Best,
>> Shammon
>>
>> On Fri, Feb 17, 2023 at 9:54 AM yuxia <lu...@alumni.sjtu.edu.cn>
>> wrote:
>>
>>> Hi, Hatem.
>>> I think there is no way to catch the exception and then ignore it in
>>> current implementation for KafkaSink.  You may also need to extend the
>>> KafkaSink.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Hatem Mostafa" <me...@hatem.co>
>>> *收件人: *"User" <us...@flink.apache.org>
>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>> *主题: *KafkaSink handling message size produce errors
>>>
>>> Hello,
>>> I am writing a flink job that reads and writes into kafka, it is using a
>>> window operator and eventually writing the result of the window into a
>>> kafka topic. The accumulated data can exceed the maximum message size after
>>> compression on the producer level. I want to be able to catch the exception
>>> coming from the producer and ignore this window. I could not find a way to
>>> do that in KafkaSink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>>> is there a way to do so?
>>>
>>> I attached here an example of an error that I would like to handle
>>> gracefully.
>>>
>>> [image: image.png]
>>>
>>>
>>> This question is similar to one that was asked on stackoverflow here
>>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> but
>>> the answer is relevant for older versions of flink.
>>>
>>> Regards,
>>> Hatem
>>>
>>>

Re: KafkaSink handling message size produce errors

Posted by Jing Ge via user <us...@flink.apache.org>.
Hi,

It makes sense to offer this feature of catching and ignoring exp with
config on/off, when we put ourselves in users' shoes. WDYT? I will create a
ticket if most of you consider it as a good feature to help users.

Best regards,
Jing

On Fri, Feb 17, 2023 at 6:01 AM Shammon FY <zj...@gmail.com> wrote:

> Hi Hatem
>
> As mentioned above, you can extend the KafkaSink or create a udf and
> process the record before sink
>
> Best,
> Shammon
>
> On Fri, Feb 17, 2023 at 9:54 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, Hatem.
>> I think there is no way to catch the exception and then ignore it in
>> current implementation for KafkaSink.  You may also need to extend the
>> KafkaSink.
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Hatem Mostafa" <me...@hatem.co>
>> *收件人: *"User" <us...@flink.apache.org>
>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>> *主题: *KafkaSink handling message size produce errors
>>
>> Hello,
>> I am writing a flink job that reads and writes into kafka, it is using a
>> window operator and eventually writing the result of the window into a
>> kafka topic. The accumulated data can exceed the maximum message size after
>> compression on the producer level. I want to be able to catch the exception
>> coming from the producer and ignore this window. I could not find a way to
>> do that in KafkaSink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
>> is there a way to do so?
>>
>> I attached here an example of an error that I would like to handle
>> gracefully.
>>
>> [image: image.png]
>>
>>
>> This question is similar to one that was asked on stackoverflow here
>> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> but
>> the answer is relevant for older versions of flink.
>>
>> Regards,
>> Hatem
>>
>>

Re: KafkaSink handling message size produce errors

Posted by Shammon FY <zj...@gmail.com>.
Hi Hatem

As mentioned above, you can extend the KafkaSink or create a udf and
process the record before sink

Best,
Shammon

On Fri, Feb 17, 2023 at 9:54 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:

> Hi, Hatem.
> I think there is no way to catch the exception and then ignore it in
> current implementation for KafkaSink.  You may also need to extend the
> KafkaSink.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Hatem Mostafa" <me...@hatem.co>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
> *主题: *KafkaSink handling message size produce errors
>
> Hello,
> I am writing a flink job that reads and writes into kafka, it is using a
> window operator and eventually writing the result of the window into a
> kafka topic. The accumulated data can exceed the maximum message size after
> compression on the producer level. I want to be able to catch the exception
> coming from the producer and ignore this window. I could not find a way to
> do that in KafkaSink
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink>,
> is there a way to do so?
>
> I attached here an example of an error that I would like to handle
> gracefully.
>
> [image: image.png]
>
>
> This question is similar to one that was asked on stackoverflow here
> <https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink> but
> the answer is relevant for older versions of flink.
>
> Regards,
> Hatem
>
>

Re: KafkaSink handling message size produce errors

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Hatem. 
I think there is no way to catch the exception and then ignore it in current implementation for KafkaSink. You may also need to extend the KafkaSink. 

Best regards, 
Yuxia 


发件人: "Hatem Mostafa" <me...@hatem.co> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 
主题: KafkaSink handling message size produce errors 

Hello, 
I am writing a flink job that reads and writes into kafka, it is using a window operator and eventually writing the result of the window into a kafka topic. The accumulated data can exceed the maximum message size after compression on the producer level. I want to be able to catch the exception coming from the producer and ignore this window. I could not find a way to do that in [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink | KafkaSink ] , is there a way to do so? 

I attached here an example of an error that I would like to handle gracefully. 




This question is similar to one that was asked on stackoverflow [ https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink | here ] but the answer is relevant for older versions of flink. 

Regards, 
Hatem