You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Antonio Si <an...@gmail.com> on 2021/03/04 17:21:12 UTC

Question regarding error raised in KafkaWriter processElement() and finishBundle()

Hi all,

I have a question regarding error being thrown in KafkaWriter.processElement(). Let's say my pipeline eventually reached KafkaWriter.processElement() twice. The first time is successful and the second time, for some reason is not successful and set numSendFailures to 1.

After that  DoFnOperator.checkInvokeFinishBundleByTime kicks in and invokes KafkaWriter.finishBundle() and it throws an IOException because numSendFailures is not 0.

Does it mean that I will have some data lost and my application will need to handle it somehow? What would be the a recommended way of error handling in Beam pipelines?

Thanks in advance.

Antonio.




Re: Question regarding error raised in KafkaWriter processElement() and finishBundle()

Posted by Alexey Romanenko <ar...@gmail.com>.
Basically, you should not have a data loss because your pipeline will be restarted from checkpoints. Do you experience another behaviour? 

To avoid duplicates of written messages, you may want to use “KafkaIO.Write.withEOS()” option but it will slow down the writes. 

> On 4 Mar 2021, at 18:21, Antonio Si <an...@gmail.com> wrote:
> 
> Hi all,
> 
> I have a question regarding error being thrown in KafkaWriter.processElement(). Let's say my pipeline eventually reached KafkaWriter.processElement() twice. The first time is successful and the second time, for some reason is not successful and set numSendFailures to 1.
> 
> After that  DoFnOperator.checkInvokeFinishBundleByTime kicks in and invokes KafkaWriter.finishBundle() and it throws an IOException because numSendFailures is not 0.
> 
> Does it mean that I will have some data lost and my application will need to handle it somehow? What would be the a recommended way of error handling in Beam pipelines?
> 
> Thanks in advance.
> 
> Antonio.
> 
> 
>