You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marc Rooding <ma...@webresource.nl> on 2019/03/28 13:03:42 UTC

Throttling/effective back-pressure on a Kafka sink

Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed since batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator.

What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction.

Thanks in advance


Re: Throttling/effective back-pressure on a Kafka sink

Posted by Derek VerLee <de...@gmail.com>.
Was any progress ever made on this? We have seen the same issue in the past.
What I do remember is, whatever I set max.block.ms to, is when the job
crashes.  
I am going to attempt to reproduce the issue again and will report back.  

  

On 3/28/19 3:27 PM, Konstantin Knauf wrote:  

> Hi Marc,  
>

>

>  
>

>

> the Kafka Producer should be able to create backpressure. Could you try to
increase [max.block.ms](http://max.block.ms) to Long.MAX_VALUE?  
>

>

>  
>

>

> The exceptions you shared for the failure case don't look like the root
causes of the problem. Could you share the full stacktraces or even full logs
for this time frame. Feel free to send these logs to me directly, if you don't
want to share them on the list.

>

>  
>

>

> Best,  
>

>

>  
>

>

> Konstantin  
>

>

>  
>

>

>  
>

>

>  
>  
>

>

> On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding
<[marc@webresource.nl](mailto:marc@webresource.nl)> wrote:  
>

>

>> Hi

>>

>>  
>

>>

>> We’ve got a job producing to a Kafka sink. The Kafka topics have a
retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t
able to back-pressure or throttle the amount of messages going to Kafka,
causing the following error:

>>

>>  
>

>>

>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed
since batch creation

>>

>>  
>

>>

>> We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka
cluster is running version 2.1.1. The Kafka producer uses all default settings
except from:

>>

>>  
>

>>

>> compression.type = snappy

>>

>> max.in.flight.requests.per.connection = 1

>>

>> acks = all

>>

>> client.dns.lookup = use_all_dns_ips

>>

>>  
>  I tried playing around with the buffer and batch settings, increasing
timeouts, but none seem to be what we need. Increasing the
[delivery.timeout.ms](http://delivery.timeout.ms) and
[request.timeout.ms](http://request.timeout.ms) solves the initial error, but
causes the Flink job to fail entirely due to:

>>

>>  
>

>>

>> Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator

>>

>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

>>

>>  
>

>>

>> My assumption is that the Kafka producer will start blocking since it
notices that it can't handle the batches, and Flink eventually runs out of
buffers for the operator.

>>

>>  
>

>>

>> What really baffles me is that the backpressure tab shows that everything
is OK. The entire job pipeline (which reads from 4 different topics, unions
them all and sinks towards 1 topic) pushes all the messages through to the
sink stage, resulting in 18 million incoming stage messages, even though Kafka
is in no way possible to keep up with this.

>>

>>  
>

>>

>> I searched for others facing the same issue but can't find anything
similar. I'm hoping that someone here could guide me in the right direction.

>>

>>  
>

>>

>> Thanks in advance

>>

>>  
>

>

>  
>  
>  \--  
>

>

> Konstantin Knauf | Solutions Architect

>

> +49 160 91394525

>

>
[![](https://lh4.googleusercontent.com/1RRzA12SK12Xaowkag-W37QDs5LHrfw4R0tMwVNjKLDKoIu69ld1qtA2hSDn1LSJe9w2THG1A9igK_nXPrNeIqRF87FjbEQoBnZJJgyPXCkKPFYuYc_Vh419P9EOO36ERgdnX5wG)](https://www.ververica.com/)

>

>  
>

>

> Follow us @VervericaData

>

> \--

>

> Join [Flink Forward](https://flink-forward.org/) \- The Apache Flink
Conference

>

> Stream Processing | Event Driven | Real Time

>

> \--

>

> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

>

> \--

>

> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Throttling/effective back-pressure on a Kafka sink

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Marc,

the Kafka Producer should be able to create backpressure. Could you try to
increase max.block.ms to Long.MAX_VALUE?

The exceptions you shared for the failure case don't look like the root
causes of the problem. Could you share the full stacktraces or even full
logs for this time frame. Feel free to send these logs to me directly, if
you don't want to share them on the list.

Best,

Konstantin




On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding <ma...@webresource.nl> wrote:

> Hi
>
> We’ve got a job producing to a Kafka sink. The Kafka topics have a
> retention of 2 weeks. When doing a complete replay, it seems like Flink
> isn’t able to back-pressure or throttle the amount of messages going to
> Kafka, causing the following error:
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has
> passed since batch creation
>
> We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka
> cluster is running version 2.1.1. The Kafka producer uses all default
> settings except from:
>
> compression.type = snappy
> max.in.flight.requests.per.connection = 1
> acks = all
> client.dns.lookup = use_all_dns_ips
>
> I tried playing around with the buffer and batch settings, increasing
> timeouts, but none seem to be what we need. Increasing the
> delivery.timeout.ms and request.timeout.ms solves the initial error, but
> causes the Flink job to fail entirely due to:
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>
> My assumption is that the Kafka producer will start blocking since it
> notices that it can't handle the batches, and Flink eventually runs out of
> buffers for the operator.
>
> What really baffles me is that the backpressure tab shows that everything
> is OK. The entire job pipeline (which reads from 4 different topics, unions
> them all and sinks towards 1 topic) pushes all the messages through to the
> sink stage, resulting in 18 million incoming stage messages, even though
> Kafka is in no way possible to keep up with this.
>
> I searched for others facing the same issue but can't find anything
> similar. I'm hoping that someone here could guide me in the right direction.
>
> Thanks in advance
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen