You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Vilhelm von Ehrenheim <vo...@gmail.com> on 2019/01/22 10:06:17 UTC

KafkaIO error

Hi!
I sometimes get the following error in one of my streaming pipelines that
use KafkaIO as sink:

java.io.IOException: KafkaWriter : failed to send 1 records (since last report)
        org.apache.beam.sdk.io.kafka.KafkaWriter.checkForFailures(KafkaWriter.java:120)
        org.apache.beam.sdk.io.kafka.KafkaWriter.finishBundle(KafkaWriter.java:74)
Caused by: org.apache.kafka.common.errors.KafkaStorageException: Disk
error when trying to access log file on the disk.

My Kafka setup is not super beefy and as I understand it this seems to
happen when it is under heavy load from my Dataflow job.

What I am wondering is essentially if this means that I am loosing data or
if this will be retried by the sink? Also if this means losing the record
what is the best way to configure the KafkaIO sink to be less aggressive?

I am still Beam 2.8 in this pipeline.

Regards,
Vilhelm von Ehrenheim

Re: KafkaIO error

Posted by Jeff Klukas <jk...@mozilla.com>.
 > this means that I am loosing data or if this will be retried by the sink?

I don't have direct experience with KafkaIO, but noting that this exception
happened in the finishBundle method, Beam will not have committed the
bundle.

More specifically, looking at the KafkaWriter code, I see that finishBundle
calls producer.flush() which will block until all messages have reached
Kafka and the SendCallbacks have completed, then it checks whether any of
the callbacks failed. In this case, one of the callbacks shows the above
exception, so it raises, which should stop pipeline execution and prevent
the bundle from being committed.

> what is the best way to configure the KafkaIO sink to be less aggressive?

I expect you may be able to avoid hitting these exception entirely by some
combination of increasing the producer config settings [0]  retries and
retry.backoff.ms . These can be set via
KafkaIO.write().updateProducerProperties.

https://kafka.apache.org/documentation/#producerconfigs [0]



On Tue, Jan 22, 2019 at 5:06 AM Vilhelm von Ehrenheim <
vonehrenheim@gmail.com> wrote:

> Hi!
> I sometimes get the following error in one of my streaming pipelines that
> use KafkaIO as sink:
>
> java.io.IOException: KafkaWriter : failed to send 1 records (since last report)
>         org.apache.beam.sdk.io.kafka.KafkaWriter.checkForFailures(KafkaWriter.java:120)
>         org.apache.beam.sdk.io.kafka.KafkaWriter.finishBundle(KafkaWriter.java:74)
> Caused by: org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk.
>
> My Kafka setup is not super beefy and as I understand it this seems to
> happen when it is under heavy load from my Dataflow job.
>
> What I am wondering is essentially if this means that I am loosing data or
> if this will be retried by the sink? Also if this means losing the record
> what is the best way to configure the KafkaIO sink to be less aggressive?
>
> I am still Beam 2.8 in this pipeline.
>
> Regards,
> Vilhelm von Ehrenheim
>