You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2022/04/13 08:29:00 UTC

[jira] [Comment Edited] (FLINK-24578) Unexpected erratic load shape for channel skew load profile and ~10% performance loss with enabled debloating

    [ https://issues.apache.org/jira/browse/FLINK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521539#comment-17521539 ] 

Piotr Nowojski edited comment on FLINK-24578 at 4/13/22 8:28 AM:
-----------------------------------------------------------------

I've spent a couple of days thinking about this problem, while running a simple benchmark with artificial data source, 5 keyBy's, without any data skew, and those are my thoughts:

A plausible explanation behind this ~10% performance drop when enabling buffer debloating.
Couple of facts:
# With buffer debloating ("w bd") , randomly, some throttling sink subtasks are idling and wasting cycles (slow subtasks), while others are 100% busy (fast subtasks), which happens less frequently without buffer debloating ("w/o bd")
# slow subtasks and fast subtasks stay in that state for the duration of running job.
# There is a correlation on some metrics between slow and fast subtasks.
a) Slow subtasks have slightly higher numRecordsIn - it stays a couple tens of thousands ahead of the faster subtasks. The difference between slow and fast subtasks in numRecordsIn stays more or less constant for the whole job running duration.
b) Slow subtasks have fewer input enqueued buffers/buffers in use, for example ~40, vs ~160 for fast subtasks
c) slow subtasks have 80-140ms idle time, fast have most of the time 0ms idle time
d) debloated buffer size for slow subatsks is 32kB, while fast subtasks have ~6kB (as a result of very similar throughput, but different buffersInUse count)
e) slow subtasks have quite small value of inputQueueSize metric, almost zero, so the input buffers are almost/mostly empty

What I think is happening is as follows:
* during startup, some subtasks are starting up a bit sooner then others. If we limit our selfs to parallelism 2, until both sink subtasks are running, the job will be fully backpressured. Once first sink boots up, it will consume all of the buffered in-flight data addressed to it, however job will be still fully backpressured, because upstream subtasks will be backpressured on the channels/subpartitions that are waiting for the second sink to boot up
* this I think is causing this initial differentiation between slow and fast subtasks. “slow” are those that actually boot up first, “fast” are those that boot up last. “slow” subtasks will be slightly ahead of “fast” subtasks in the consumption of records, hence the difference in “numRecordsIn” metric.
once “fast” subtasks finally boot up, they have tons of buffered in-flight data to consume on all input channels, while “slow” subtasks have mostly empty input buffers. This results in buffer debloating mechanism to assign larger desired buffer sizes for “slow” subtasks, while “fast” will have smaller buffer sizes.
* this state is stable and is perpetually kept, because now look at the upstream subtasks. It is still backpressured on those subpartitions leading to “fast” sinks. Sinks are perpetuating this state, because those subpartitions on which upstream subtasks are backpressured will be forced to limit their buffer size, while non backpressured subpartitions will be granted even larger buffer sizes, basically that subpartitions with larger buffer sizes will not be backpressured ever.

However all in all, I think this is not The Problem.

Now think about a scenario where we have perfect records distribution. I think we would  end up in a state, that all subpartitions are consuming records equally quickly, have exactly the same numRecordsInRate, but “slow” subtasks are slightly ahead of “fast” subtasks. Since slow subpartitions have smaller buffer sizes, they can fit fewer records, they will be always causing the backpressure. But with perfect records distribution I think this wouldn’t result with wasted resources on the downstream nodes.

The Problem happens I think because of some slight random intermittent hiccups/data skews. If anything like that happens for the “slow”/“large” subpartitions, it’s unlikely that it affects the job as a whole, since upstream subtasks have some available space in the “slow” subpartitions to smooth out those hiccups/skews (remember “slow” subpartitions end up with larger buffer sizes). However if hiccup/data skew happens for a brief moment on the “fast” subtask, this will result in backpressuring and stalling upstreaming subtask, which in turns cause a stall and idle time on the “fast” sinks, since they don’t have much (if any) buffered data to smooth out the hiccups.

What’s worse, is that it might be not just the “buffer debloating” problem. It looks like just a problem of being able or not to smooth out hiccups/intermittent data skew. I have seen exactly the same throughput with disabled buffer debloating, but with manually reduced buffer size to 8kB, as with enabled buffer debloating and “fast” subpartitions that have ~6kB buffer size.


was (Author: pnowojski):
I've spent a couple of days thinking about this problem, while running a simple benchmark with artificial data source, 5 keyBy's, without any data skew, and those are my thoughts:

A plausible explanation behind this ~10% performance drop when enabling buffer debloating.
Couple of facts:
# With buffer debloating ("w bd") , randomly, some throttling sink subtasks are idling and wasting cycles (slow subtasks), while others are 100% busy (fast subtasks), which happens less frequently without buffer debloating ("w/o bd")
# slow subtasks and fast subtasks stay in that state for the duration of running job.
# There is a correlation on some metrics between slow and fast subtasks.
a) Slow subtasks have slightly higher numRecordsIn - it stays a couple tens of thousands ahead of the faster subtasks. The difference between slow and fast subtasks in numRecordsIn stays more or less constant for the whole job running duration.
b) Slow subtasks have fewer input enqueued buffers/buffers in use, for example ~40, vs ~160 for fast subtasks
c) slow subtasks have 80-140ms idle time, fast have most of the time 0ms idle time
d) debloated buffer size for slow subatsks is 32kB, while fast subtasks have ~6kB (as a result of very similar throughput, but different buffersInUse count)
e) slow subtasks have quite small value of inputQueueSize metric, almost zero, so the input buffers are almost/mostly empty

What I think is happening is as follows:
* during startup, some subtasks are starting up a bit sooner then others. If we limit our selfs to parallelism 2, until both sink subtasks are running, the job will be fully backpressured. Once first sink boots up, it will consume all of the buffered in-flight data addressed to it, however job will be still fully backpressured, because upstream subtasks will be backpressured on the channels/subpartitions that are waiting for the second sink to boot up
* this I think is causing this initial differentiation between slow and fast subtasks. “slow” are those that actually boot up first, “fast” are those that boot up last. “slow” subtasks will be slightly ahead of “fast” subtasks in the consumption of records, hence the difference in “numRecordsIn” metric.
once “fast” subtasks finally boot up, they have tons of buffered in-flight data to consume on all input channels, while “slow” subtasks have mostly empty input buffers. This results in buffer debloating mechanism to assign larger desired buffer sizes for “slow” subtasks, while “fast” will have smaller buffer sizes.
* this state is stable and is perpetually kept, because now look at the upstream subtasks. It is still backpressured on those subpartitions leading to “fast” sinks. Sinks are perpetuating this state, because those subpartitions on which upstream subtasks are backpressured will be forced to limit their buffer size, while non backpressured subpartitions will be granted even larger buffer sizes, basically that subpartitions with larger buffer sizes will not be backpressured ever.

However all in all, I think this is not The Problem.

Now think about a scenario where we have perfect records distribution. I think we would  end up in a state, that all subpartitions are consuming records equally quickly, have exactly the same numRecordsInRate, but “slow” subtasks are slightly ahead of “fast” subtasks. Since slow subpartitions have smaller buffer sizes, they can fit fewer records, they will be always causing the backpressure. But with perfect records distribution I think this wouldn’t result with wasted resources on the downstream nodes.

The Problem happens I think because of some slight random intermittent hiccups/data skews. If anything like that happens for the “slow”/“large” subpartitions, it’s unlikely that it affects the job as a whole, since upstream subtasks have some available space in the “slow” subpartitions to smooth out those hiccups/skews (remember “slow” subpartitions end up with larger buffer sizes). However if hiccup/data skew happens for a brief moment on the “fast” subtask, this will result in backpressuring and stalling upstreaming subtask, which in turns cause a stall and idle time on the “fast” sinks, since they don’t have much (if any) buffered data to smooth out the hiccups.

> Unexpected erratic load shape for channel skew load profile and ~10% performance loss with enabled debloating
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24578
>                 URL: https://issues.apache.org/jira/browse/FLINK-24578
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0
>            Reporter: Anton Kalashnikov
>            Priority: Major
>         Attachments: antiphaseBufferSize.png, erraticBufferSize1.png, erraticBufferSize2.png
>
>
> given:
> The job with 5 maps(with keyBy).
> All channels are remote. Parallelism is 80
> The first task produces only two keys - `indexOfThisSubtask` and `indexOfThisSubtask + 1`. So every subTask has a constant value of active channels(depends on hash rebalance)
> Every record has an equal size and is processed for an equal time.
>  
> when: 
> The buffer debloat is enabled with the default configuration.
>  
> then:
> The buffer size synchonizes on every subTask on the first map for some reason. It can have the strong synchronization as shown on the erraticBufferSize1 picture but usually synchronization is less explicit as on erraticBufferSize2.
> !erraticBufferSize1.png!
> !erraticBufferSize2.png!  
>  
> Expected:
> After the stabilization period the buffer size should be mostly constant with small fluctuation or the different tasks should be in antiphase to each other(when one subtask has small buffer size the another should have a big buffer size). for example the picture antiphaseBufferSize
> !antiphaseBufferSize.png!
>  
> Unfortunatelly, it is not reproduced every time which means that this problem can be connected to environment. But at least, it makes sense to try to understand why we have so strange load shape when only several input channels are active.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)