You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ori Popowski <or...@gmail.com> on 2022/11/21 08:58:53 UTC

More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Hi,

We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat
and I'm seeing significant rise in backpressure and Kafka lag. I am still
not sure if it's because of the version upgrade or turning on the Buffer
Debloat.

I am doing checkpoints once an hour with at-least-once semantics.

   1. Is it possible the Buffer Debloat affects the job performance outside
   the checkpointing? We're seeing this rise even between checkpoints
   2. Is it possible that between version 1.10 and 1.15 there was a change
   in the metrics reporting?

Attached screenshots before/after


[image: image.png]

[image: image.png]

Thanks

Re: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Posted by Ori Popowski <or...@gmail.com>.
I am measuring the lag the same way I measured before the upgrade - why
should there be a difference?

On Tue, Nov 22, 2022 at 1:27 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi,
>
> If you're measuring lag by looking at the committed offsets and you have a
> checkpoint once per hour, that could be a reason. As explained at
> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing
> Flink only commits offsets when a checkpoint is completed.
>
> Best regards,
>
> Martijn
>
> On Mon, Nov 21, 2022 at 1:02 PM Tamir Sagi <Ta...@niceactimize.com>
> wrote:
>
>> Hey Ori,
>>
>>
>>    1. What is the parallelism configuration you are using?
>>    2. How many partitions does the topic have?
>>    3. Did you check Flink UI to notice where the backpressure is? is the
>>    screenshot described Kafka backpressure?
>>
>> We have encountered similar behavior with backpressure in Kafka sink.
>> There were several reasons
>>
>>    1. Sink Operator parallelism < number of topic partitions.
>>    2. The job has been restarted due to failure in checkpoints without Tolerable
>>    checkpoint failure number [1], which is a numeric value saying how
>>    many consecutive checkpoint failures can we have before failing the job. if
>>    not set the default is 0 [2] which means that the job will be restarted for
>>    every failure in checkpoint.
>>     you can set this value to higher number [3] via
>>    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(<value>)
>>    3. There was a keyBy just before the Sink, which shifted most of the
>>    stream to the same slot(1/12). other instances were idle.
>>
>> Please provide more information if possible.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
>>
>> [2]
>> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L415-L420
>>
>> [3]
>> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L427-L433
>>
>> Tamir.
>> ------------------------------
>> *From:* Ori Popowski <or...@gmail.com>
>> *Sent:* Monday, November 21, 2022 10:58 AM
>> *To:* user <us...@flink.apache.org>
>> *Subject:* More backpressure and Kafka lag after upgrading to 1.15 and
>> using Buffer Debloat
>>
>>
>> *EXTERNAL EMAIL*
>>
>>
>> Hi,
>>
>> We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat
>> and I'm seeing significant rise in backpressure and Kafka lag. I am still
>> not sure if it's because of the version upgrade or turning on the Buffer
>> Debloat.
>>
>> I am doing checkpoints once an hour with at-least-once semantics.
>>
>>    1. Is it possible the Buffer Debloat affects the job performance
>>    outside the checkpointing? We're seeing this rise even between checkpoints
>>    2. Is it possible that between version 1.10 and 1.15 there was a
>>    change in the metrics reporting?
>>
>> Attached screenshots before/after
>>
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>> Thanks
>>
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> privileged. Any opinions expressed in this communication are not
>> necessarily those of NICE Actimize. If this communication has come to you
>> in error you must take no action based on it, nor must you copy or show it
>> to anyone; please delete/destroy and inform the sender by e-mail
>> immediately.
>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>> Viruses: Although we have taken steps toward ensuring that this e-mail
>> and attachments are free from any virus, we advise that in keeping with
>> good computing practice the recipient should ensure they are actually virus
>> free.
>>
>

Re: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Posted by Martijn Visser <ma...@apache.org>.
Hi,

If you're measuring lag by looking at the committed offsets and you have a
checkpoint once per hour, that could be a reason. As explained at
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing
Flink only commits offsets when a checkpoint is completed.

Best regards,

Martijn

On Mon, Nov 21, 2022 at 1:02 PM Tamir Sagi <Ta...@niceactimize.com>
wrote:

> Hey Ori,
>
>
>    1. What is the parallelism configuration you are using?
>    2. How many partitions does the topic have?
>    3. Did you check Flink UI to notice where the backpressure is? is the
>    screenshot described Kafka backpressure?
>
> We have encountered similar behavior with backpressure in Kafka sink.
> There were several reasons
>
>    1. Sink Operator parallelism < number of topic partitions.
>    2. The job has been restarted due to failure in checkpoints without Tolerable
>    checkpoint failure number [1], which is a numeric value saying how
>    many consecutive checkpoint failures can we have before failing the job. if
>    not set the default is 0 [2] which means that the job will be restarted for
>    every failure in checkpoint.
>     you can set this value to higher number [3] via
>    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(<value>)
>    3. There was a keyBy just before the Sink, which shifted most of the
>    stream to the same slot(1/12). other instances were idle.
>
> Please provide more information if possible.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
>
> [2]
> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L415-L420
>
> [3]
> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L427-L433
>
> Tamir.
> ------------------------------
> *From:* Ori Popowski <or...@gmail.com>
> *Sent:* Monday, November 21, 2022 10:58 AM
> *To:* user <us...@flink.apache.org>
> *Subject:* More backpressure and Kafka lag after upgrading to 1.15 and
> using Buffer Debloat
>
>
> *EXTERNAL EMAIL*
>
>
> Hi,
>
> We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat
> and I'm seeing significant rise in backpressure and Kafka lag. I am still
> not sure if it's because of the version upgrade or turning on the Buffer
> Debloat.
>
> I am doing checkpoints once an hour with at-least-once semantics.
>
>    1. Is it possible the Buffer Debloat affects the job performance
>    outside the checkpointing? We're seeing this rise even between checkpoints
>    2. Is it possible that between version 1.10 and 1.15 there was a
>    change in the metrics reporting?
>
> Attached screenshots before/after
>
>
> [image: image.png]
>
> [image: image.png]
>
> Thanks
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Re: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Posted by Martijn Visser <ma...@apache.org>.
Since the KafkaSource is a refactoring of the FlinkKafkaConsumer, that can
also mean that your original method of lag measurement might be working for
the old version, but not for the newer one. That's also why it's explicitly
documented. The Flink Kafka connector works differently from a normal Kafka
connector so it works properly with Flinks checkpointing mechanism /
exactly-once guarantees. Checkpoints of an hour are really long from a
Flink perspective.

Do you have more parallelism then you have partitions? Unless you have a
watermark strategy defined, that means that your Kafka Source won't
automatically go in an idle state. Also see the documentation for those.

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#idleness

On Mon, Nov 28, 2022 at 4:10 PM Ori Popowski <or...@gmail.com> wrote:

> Tamir, we are using Kafka Source without a Sink. We have enough
> parallelism for all partitions.
> After I observed this behavior I shut down Buffer Debloat and we have
> normal backpressure and lag
>
> On Mon, Nov 21, 2022 at 2:01 PM Tamir Sagi <Ta...@niceactimize.com>
> wrote:
>
>> Hey Ori,
>>
>>
>>    1. What is the parallelism configuration you are using?
>>    2. How many partitions does the topic have?
>>    3. Did you check Flink UI to notice where the backpressure is? is the
>>    screenshot described Kafka backpressure?
>>
>> We have encountered similar behavior with backpressure in Kafka sink.
>> There were several reasons
>>
>>    1. Sink Operator parallelism < number of topic partitions.
>>    2. The job has been restarted due to failure in checkpoints without Tolerable
>>    checkpoint failure number [1], which is a numeric value saying how
>>    many consecutive checkpoint failures can we have before failing the job. if
>>    not set the default is 0 [2] which means that the job will be restarted for
>>    every failure in checkpoint.
>>     you can set this value to higher number [3] via
>>    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(<value>)
>>    3. There was a keyBy just before the Sink, which shifted most of the
>>    stream to the same slot(1/12). other instances were idle.
>>
>> Please provide more information if possible.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
>>
>> [2]
>> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L415-L420
>>
>> [3]
>> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L427-L433
>>
>> Tamir.
>> ------------------------------
>> *From:* Ori Popowski <or...@gmail.com>
>> *Sent:* Monday, November 21, 2022 10:58 AM
>> *To:* user <us...@flink.apache.org>
>> *Subject:* More backpressure and Kafka lag after upgrading to 1.15 and
>> using Buffer Debloat
>>
>>
>> *EXTERNAL EMAIL*
>>
>>
>> Hi,
>>
>> We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat
>> and I'm seeing significant rise in backpressure and Kafka lag. I am still
>> not sure if it's because of the version upgrade or turning on the Buffer
>> Debloat.
>>
>> I am doing checkpoints once an hour with at-least-once semantics.
>>
>>    1. Is it possible the Buffer Debloat affects the job performance
>>    outside the checkpointing? We're seeing this rise even between checkpoints
>>    2. Is it possible that between version 1.10 and 1.15 there was a
>>    change in the metrics reporting?
>>
>> Attached screenshots before/after
>>
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>> Thanks
>>
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> privileged. Any opinions expressed in this communication are not
>> necessarily those of NICE Actimize. If this communication has come to you
>> in error you must take no action based on it, nor must you copy or show it
>> to anyone; please delete/destroy and inform the sender by e-mail
>> immediately.
>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>> Viruses: Although we have taken steps toward ensuring that this e-mail
>> and attachments are free from any virus, we advise that in keeping with
>> good computing practice the recipient should ensure they are actually virus
>> free.
>>
>

Re: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Posted by Ori Popowski <or...@gmail.com>.
Tamir, we are using Kafka Source without a Sink. We have enough parallelism
for all partitions.
After I observed this behavior I shut down Buffer Debloat and we have
normal backpressure and lag

On Mon, Nov 21, 2022 at 2:01 PM Tamir Sagi <Ta...@niceactimize.com>
wrote:

> Hey Ori,
>
>
>    1. What is the parallelism configuration you are using?
>    2. How many partitions does the topic have?
>    3. Did you check Flink UI to notice where the backpressure is? is the
>    screenshot described Kafka backpressure?
>
> We have encountered similar behavior with backpressure in Kafka sink.
> There were several reasons
>
>    1. Sink Operator parallelism < number of topic partitions.
>    2. The job has been restarted due to failure in checkpoints without Tolerable
>    checkpoint failure number [1], which is a numeric value saying how
>    many consecutive checkpoint failures can we have before failing the job. if
>    not set the default is 0 [2] which means that the job will be restarted for
>    every failure in checkpoint.
>     you can set this value to higher number [3] via
>    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(<value>)
>    3. There was a keyBy just before the Sink, which shifted most of the
>    stream to the same slot(1/12). other instances were idle.
>
> Please provide more information if possible.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
>
> [2]
> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L415-L420
>
> [3]
> https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L427-L433
>
> Tamir.
> ------------------------------
> *From:* Ori Popowski <or...@gmail.com>
> *Sent:* Monday, November 21, 2022 10:58 AM
> *To:* user <us...@flink.apache.org>
> *Subject:* More backpressure and Kafka lag after upgrading to 1.15 and
> using Buffer Debloat
>
>
> *EXTERNAL EMAIL*
>
>
> Hi,
>
> We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat
> and I'm seeing significant rise in backpressure and Kafka lag. I am still
> not sure if it's because of the version upgrade or turning on the Buffer
> Debloat.
>
> I am doing checkpoints once an hour with at-least-once semantics.
>
>    1. Is it possible the Buffer Debloat affects the job performance
>    outside the checkpointing? We're seeing this rise even between checkpoints
>    2. Is it possible that between version 1.10 and 1.15 there was a
>    change in the metrics reporting?
>
> Attached screenshots before/after
>
>
> [image: image.png]
>
> [image: image.png]
>
> Thanks
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Re: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat

Posted by Tamir Sagi <Ta...@niceactimize.com>.
Hey Ori,


  1.  What is the parallelism configuration you are using?
  2.  How many partitions does the topic have?
  3.  Did you check Flink UI to notice where the backpressure is? is the screenshot described Kafka backpressure?

We have encountered similar behavior with backpressure in Kafka sink.
There were several reasons

  1.  Sink Operator parallelism < number of topic partitions.
  2.
The job has been restarted due to failure in checkpoints without Tolerable checkpoint failure number [1], which is a numeric value saying how many consecutive checkpoint failures can we have before failing the job. if not set the default is 0 [2] which means that the job will be restarted for every failure in checkpoint.
 you can set this value to higher number [3] via env.getCheckpointConfig().setTolerableCheckpointFailureNumber(<value>)
  3.
There was a keyBy just before the Sink, which shifted most of the stream to the same slot(1/12). other instances were idle.

Please provide more information if possible.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/

[2] https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L415-L420

[3] https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L427-L433

Tamir.
________________________________
From: Ori Popowski <or...@gmail.com>
Sent: Monday, November 21, 2022 10:58 AM
To: user <us...@flink.apache.org>
Subject: More backpressure and Kafka lag after upgrading to 1.15 and using Buffer Debloat


EXTERNAL EMAIL


Hi,

We upgraded to Flink 1.15 from Flink 1.10. I turned on the Buffer Debloat and I'm seeing significant rise in backpressure and Kafka lag. I am still not sure if it's because of the version upgrade or turning on the Buffer Debloat.

I am doing checkpoints once an hour with at-least-once semantics.

  1.  Is it possible the Buffer Debloat affects the job performance outside the checkpointing? We're seeing this rise even between checkpoints
  2.  Is it possible that between version 1.10 and 1.15 there was a change in the metrics reporting?

Attached screenshots before/after


[image.png]

[image.png]

Thanks



Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.