You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by zhijiang <wa...@aliyun.com> on 2019/03/01 02:55:41 UTC

Re: Flink performance drops when async checkpoint is slow

Hi Paul,

Thanks for your feedback. If the at-least-once mode still causes the problem, we can confirm it is not caused by blocking behavior in exactly-once-mode mentioned before.

For at-least once, the task would continue processing the buffers following with barriers during allignment. But for exactly-once, the task would block the channel after reading barrier during allignment. It is difficult to confirm based on the obsolute state size which is also related to network buffer setting and parallelism.

I think your problem is once activing checkpoint, the backpressure is also caused sometimes which results in low performance and cpu usages. 
Maybe we can analyze which vertex and subtask cause the backpressure and then traces its jstack to check which operations slow down it. The source vertex is blocked in `requestingBufferBuilder` once backpressure. But I am not sure it is caused by middle vertex or the last sink vertex. If the middle vertex is also blocked by requesting buffer as you mentioned in first email, then the backpressure should be caused by last sink vertex. But the jstack of last sink task should not be in `getNextBuffer` or you did not trace all the sink tasks parallelism.  

My suggestion is first to check which vertex causes the backpressure (middle or last sink vertex?). And then trace the jstack of proper parallelism task in this vertex, you can select the task with the largest inqueue length. I think we might find something if seeing which operation delays the task to cause the backpressure, and this operation might be involved with HDFS. :)

Best,
Zhijiang


------------------------------------------------------------------
From:Paul Lam <pa...@gmail.com>
Send Time:2019年2月28日(星期四) 19:17
To:zhijiang <wa...@aliyun.com>
Cc:user <us...@flink.apache.org>
Subject:Re: Flink performance drops when async checkpoint is slow

Hi Zhijiang,

Thanks a lot for your reasoning! 

I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily the problem remains the same :(

IMHO, if it’s caused by barrier alignment, the state size (mainly buffers during alignment) would be big, right? But actually it’s not, so we didn’t think that way before.

Best,
Paul Lam

在 2019年2月28日,16:12,zhijiang <wa...@aliyun.com> 写道:
Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting states for FsStateBackend. But I have another experience which might also cause your problem.
From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent` that means the middle task does not have any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from some source tasks. For the input channels which already receives the barriers, the middle task would not process the following data buffers and just cache them, so it would result in backpressure the corresponding source based on credit-based flow control.  For the input channels without barriers, if there are also no data buffers, then the middle task would not have any outputs. So I think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not decreased for a period as before, I think we could confirm the above analysis. :)

Best,
Zhijiang
------------------------------------------------------------------
From:Paul Lam <pa...@gmail.com>
Send Time:2019年2月28日(星期四) 15:17
To:user <us...@flink.apache.org>
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some transformations and aggregates, and write to two Kafka topics respectively. Meanwhile, there’s a custom source that pulls configurations for the transformations periodically. The generic job graph is as below.

<屏幕快照 2019-02-25 11.24.54.png>

The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is unstable, and sometimes HDFS client reports slow read and slow waitForAckedSeqno during checkpoints. When that happens, the Flink job consume rate drops significantly, and some taskmanager’ cpu usage drops from about 140% to 1%, all the task threads on that taskmanager are blocked. This situation lasts from seconds to a minute. We started a parallel job with everything the same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task right after it is blocked at requesting memory buffer (with back pressure close to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignment is less than 10 MB, even when back pressure is high.

We’ve been struggling with this problem for weeks, and any help is appreciated. Thanks a lot!

Best,
Paul Lam




Re: Flink performance drops when async checkpoint is slow

Posted by Stephan Ewen <se...@apache.org>.
Hi Paul!

One issue could be that state in Flnk 1.5.x state is asynchronous, but
timers are synchronous. Timers are asynchronous starting from Flink 1.6.

Best,
Stephan


On Fri, Mar 1, 2019 at 4:03 AM zhijiang <wa...@aliyun.com> wrote:

> Hi Paul,
>
> Thanks for your feedback. If the at-least-once mode still causes the
> problem, we can confirm it is not caused by blocking behavior in
> exactly-once-mode mentioned before.
>
> For at-least once, the task would continue processing the buffers
> following with barriers during allignment. But for exactly-once, the task
> would block the channel after reading barrier during allignment. It is
> difficult to confirm based on the obsolute state size which is also related
> to network buffer setting and parallelism.
>
> I think your problem is once activing checkpoint, the backpressure is also
> caused sometimes which results in low performance and cpu usages.
> Maybe we can analyze which vertex and subtask cause the backpressure and
> then traces its jstack to check which operations slow down it. The source
> vertex is blocked in `requestingBufferBuilder` once backpressure. But I am
> not sure it is caused by middle vertex or the last sink vertex. If the
> middle vertex is also blocked by requesting buffer as you mentioned in
> first email, then the backpressure should be caused by last sink vertex.
> But the jstack of last sink task should not be in `getNextBuffer` or you
> did not trace all the sink tasks parallelism.
>
> My suggestion is first to check which vertex causes the backpressure
> (middle or last sink vertex?). And then trace the jstack of proper
> parallelism task in this vertex, you can select the task with the largest
> inqueue length. I think we might find something if seeing which operation
> delays the task to cause the backpressure, and this operation might be
> involved with HDFS. :)
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Paul Lam <pa...@gmail.com>
> Send Time:2019年2月28日(星期四) 19:17
> To:zhijiang <wa...@aliyun.com>
> Cc:user <us...@flink.apache.org>
> Subject:Re: Flink performance drops when async checkpoint is slow
>
> Hi Zhijiang,
>
> Thanks a lot for your reasoning!
>
> I tried to set the checkpoint to at-leaset-once as you suggested, but
> unluckily the problem remains the same :(
>
> IMHO, if it’s caused by barrier alignment, the state size (mainly buffers
> during alignment) would be big, right? But actually it’s not, so we
> didn’t think that way before.
>
> Best,
> Paul Lam
>
> 在 2019年2月28日,16:12,zhijiang <wa...@aliyun.com> 写道:
>
> Hi Paul,
>
> I am not sure whether task thread is involverd in some works during
> snapshoting states for FsStateBackend. But I have another experience
> which might also cause your problem.
> From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent`
> that means the middle task does not have any outpus or the middle operator
> does not process records.
> The backpressure is high between source and middle task which results in
> blocking the source task in `requestBufferBuilder`.
>
> Based on above two points, I guess the middle task is waiting for barrier
> from some source tasks. For the input channels which already receives the
> barriers, the middle task would not process the following data buffers and
> just cache them, so it would result in backpressure the corresponding
> source based on credit-based flow control.  For the input channels without
> barriers, if there are also no data buffers, then the middle task would not
> have any outputs. So I think one hint is to trace why some source task
> emits barrier delay.
>
> In order to double check the above analysis, you can change the checkpoint
> mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS
> are not decreased for a period as before, I think we could confirm the
> above analysis. :)
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:Paul Lam <pa...@gmail.com>
> Send Time:2019年2月28日(星期四) 15:17
> To:user <us...@flink.apache.org>
> Subject:Flink performance drops when async checkpoint is slow
>
> Hi,
>
> I have a Flink job (version 1.5.3) that consumes from Kafka topic, does
> some transformations and aggregates, and write to two Kafka topics
> respectively. Meanwhile, there’s a custom source that pulls configurations
> for the transformations periodically. The generic job graph is as below.
>
> <屏幕快照 2019-02-25 11.24.54.png>
>
> The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is
> unstable, and sometimes HDFS client reports slow read and slow
> waitForAckedSeqno during checkpoints. When that happens, the Flink job
> consume rate drops significantly, and some taskmanager’ cpu usage drops
> from about 140% to 1%, all the task threads on that taskmanager are
> blocked. This situation lasts from seconds to a minute. We started a
> parallel job with everything the same except checkpointing disabled, and it
> runs very steady.
> But I think as the checkpointing is async, it should not affect the task
> threads.
>
> There are some additional information that we observed:
>
> -  When the performance drops, jstack shows that Kafka source and the task
> right after it is blocked at requesting memory buffer (with back pressure
> close to 1), and the last task is blocked at  `
> SingleInputGate.getNextBufferOrEvent`.
> - The dashboard shows that the buffer during alignment is less than 10 MB,
> even when back pressure is high.
>
> We’ve been struggling with this problem for weeks, and any help is
> appreciated. Thanks a lot!
>
> Best,
> Paul Lam
>
>
>
>
>