You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nelson Steven <Ne...@JohnDeere.com> on 2020/07/14 20:12:35 UTC

Backpressure on Window step

Hello!

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren't seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

-Steve

Re: Backpressure on Window step

Posted by David Anderson <da...@alpinegizmo.com>.
Steve,

Your approach to debugging this sounds reasonable, but keep in mind that
the backpressure detection built into the WebUI is not infallible. You
could have backpressure that it doesn't detect.

FWIW, keyBy isn't an operator -- it's a declaration of how the operators
before and after the keyBy are connected.

Have you tried increasing the parallelism of the task(s) before the window
(where the parallelism is currently 4)? Given that your job is already
using 32 slots, you have little to lose by doing so. Perhaps the keyBy (and
associated change in parallelism) is the first point in your job where the
events are being serialized and sent over the network, and maybe 4
instances aren't enough to consistently provide the required throughput.

David

On Fri, Jul 17, 2020 at 11:17 PM Nelson Steven <Ne...@johndeere.com>
wrote:

> First off, thanks for your reply!
>
>
>
> I have an assumption that I should probably verify first:
>
> When determining the source of the backpressure we look (in the WebUI) for
> the first operator in our pipeline that is not showing backpressure. That’s
> what we consider to be the source of the backpressure
>
>
>
> In this case the first operator that in our graph that is not showing
> backpressure is our window operator (all though the keyBy operation right
> before it doesn’t show up in the graph). The window function uses a custom
> aggregation function that builds up a hashmap and a custom process function
> that emits the hashmap and performs some metrics operations. I am not sure
> how this would generate backpressure since it doesn’t perform any IO, but
> again I might be drawing incorrect conclusions.
>
>
>
> The window function has a parallelism of 32. Each of the Subtasks has
> between 136kb and 2.45mb of state, with a checkpoint duration of 280ms to 2
> seconds. Each of the 32 subtasks appear to be handling 1,700-50,000 records
> an hour with a bytes received of 7mb and 170mb
>
>
>
> Am I barking up the wrong tree?
>
>
>
> -Steve
>
>
>
>
>
> *From:* David Anderson <da...@alpinegizmo.com>
> *Sent:* Friday, July 17, 2020 6:54 AM
> *To:* Nelson Steven <Ne...@JohnDeere.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Backpressure on Window step
>
>
>
> Backpressure is typically caused by something like one of these things:
>
>
>
> * problems relating to i/o to external services (e.g., enrichment via an
> API or database lookup, or a sink)
>
> * data skew (e.g., a hot key)
>
> * under-provisioning, or competition for resources
>
> * spikes in traffic
>
> * timer storms
>
>
>
> I would start to debug this by looking for signs of significant
> asymmetry in the metrics (across the various subtasks), or resource
> exhaustion. Could be related to the network, GC, CPU, disk i/o, etc.
> Flink's webUI will show you checkpoint size and timing information for each
> sub-task; you can learn a lot from studying that data.
>
>
>
> Relating to session windows -- could you occasionally have an unusually
> long session, and might that cause problems?
>
>
>
> Best,
> David
>
>
>
> On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <Ne...@johndeere.com>
> wrote:
>
> Hello!
>
>
>
> We are experiencing occasional backpressure on a Window function in our
> pipeline. The window is on a KeyedStream and is triggered by an
> EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a
> fanout and we use the window to sort things into batches based on the Key
> for the keyed stream. We aren’t seeing an unreasonable amount of records
> (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We
> are as interested in learning out to debug the issue as we are in fixing
> the actual problem. Any ideas?
>
>
>
> -Steve
>
>

RE: Backpressure on Window step

Posted by Nelson Steven <Ne...@JohnDeere.com>.
First off, thanks for your reply!

I have an assumption that I should probably verify first:
When determining the source of the backpressure we look (in the WebUI) for the first operator in our pipeline that is not showing backpressure. That’s what we consider to be the source of the backpressure

In this case the first operator that in our graph that is not showing backpressure is our window operator (all though the keyBy operation right before it doesn’t show up in the graph). The window function uses a custom aggregation function that builds up a hashmap and a custom process function that emits the hashmap and performs some metrics operations. I am not sure how this would generate backpressure since it doesn’t perform any IO, but again I might be drawing incorrect conclusions.

The window function has a parallelism of 32. Each of the Subtasks has between 136kb and 2.45mb of state, with a checkpoint duration of 280ms to 2 seconds. Each of the 32 subtasks appear to be handling 1,700-50,000 records an hour with a bytes received of 7mb and 170mb

Am I barking up the wrong tree?

-Steve


From: David Anderson <da...@alpinegizmo.com>
Sent: Friday, July 17, 2020 6:54 AM
To: Nelson Steven <Ne...@JohnDeere.com>
Cc: user@flink.apache.org
Subject: Re: Backpressure on Window step

Backpressure is typically caused by something like one of these things:

* problems relating to i/o to external services (e.g., enrichment via an API or database lookup, or a sink)
* data skew (e.g., a hot key)
* under-provisioning, or competition for resources
* spikes in traffic
* timer storms

I would start to debug this by looking for signs of significant asymmetry in the metrics (across the various subtasks), or resource exhaustion. Could be related to the network, GC, CPU, disk i/o, etc. Flink's webUI will show you checkpoint size and timing information for each sub-task; you can learn a lot from studying that data.

Relating to session windows -- could you occasionally have an unusually long session, and might that cause problems?

Best,
David

On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <Ne...@johndeere.com>> wrote:
Hello!

We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for the keyed stream. We aren’t seeing an unreasonable amount of records (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We are as interested in learning out to debug the issue as we are in fixing the actual problem. Any ideas?

-Steve

Re: Backpressure on Window step

Posted by David Anderson <da...@alpinegizmo.com>.
Backpressure is typically caused by something like one of these things:

* problems relating to i/o to external services (e.g., enrichment via an
API or database lookup, or a sink)
* data skew (e.g., a hot key)
* under-provisioning, or competition for resources
* spikes in traffic
* timer storms

I would start to debug this by looking for signs of significant
asymmetry in the metrics (across the various subtasks), or resource
exhaustion. Could be related to the network, GC, CPU, disk i/o, etc.
Flink's webUI will show you checkpoint size and timing information for each
sub-task; you can learn a lot from studying that data.

Relating to session windows -- could you occasionally have an unusually
long session, and might that cause problems?

Best,
David

On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven <Ne...@johndeere.com>
wrote:

> Hello!
>
>
>
> We are experiencing occasional backpressure on a Window function in our
> pipeline. The window is on a KeyedStream and is triggered by an
> EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a
> fanout and we use the window to sort things into batches based on the Key
> for the keyed stream. We aren’t seeing an unreasonable amount of records
> (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We
> are as interested in learning out to debug the issue as we are in fixing
> the actual problem. Any ideas?
>
>
>
> -Steve
>