You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Hailu, Andreas [Engineering]" <An...@gs.com> on 2021/02/16 21:58:36 UTC

Understanding blocking behavior

Hi folks, I'm trying to get a better understanding of what operations result in blocked partitions. I've got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks.

The kind of Sink a record in the DataSet is written to is dependent on the record's properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this:

DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)
DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X)

Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in 'CREATED' states while the Map + Filter operators are 'RUNNING'. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter.

Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they're moving to an operator that has a lesser parallelism?

____________

Andreas Hailu
Data Lake Engineering | Goldman Sachs


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: Understanding blocking behavior

Posted by Arvid Heise <ar...@apache.org>.
Hi Andreas,

Julian already offered a good explanation, so here is one possible
solution: you could try to run the whole first subpipeline with parallelism
X and the second with P-X. However, most likely you need to run with P>X to
finish in time.

Another way is to use DataStream (your program is not doing any
aggregation/join, so streaming is indeed a good fit) and use STREAMING mode
(the only execution mode for DataStream in Flink <1.12). There, all tasks
are active all the time and records are streamed through as you expect.
Since we plan to phase out DataSet API eventually, it's also the more
future-proof solution.

Best,

Arvid

On Tue, Feb 16, 2021 at 11:37 PM Jaffe, Julian <Ju...@activision.com>
wrote:

> Hey Andreas,
>
>
>
> Have a read through
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle
> and in particular the BATCH Execution Mode section. Your intuition is
> mostly correct – because your operators can’t be chained due to the
> rebalancing, if you execute your pipeline in batch mode downstream tasks
> will not begin processing data until the upstream tasks have finished all
> of their processing. If you can forgo the higher resiliency and lower
> resource requirements of executing in batch mode, you could try running
> your pipeline in streaming mode over bounded data.
>
>
>
> Julian
>
>
>
> *From: *"Hailu, Andreas [Engineering]" <An...@gs.com>
> *Date: *Tuesday, February 16, 2021 at 2:00 PM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Understanding blocking behavior
>
>
>
> Hi folks, I’m trying to get a better understanding of what operations
> result in blocked partitions. I’ve got a batch-processing job that reads
> from 2 sources, and then performs a series of Maps/Filters/CoGroups all
> with the same parallelism to create a final DataSet to be written to two
> different Sinks.
>
>
>
> The kind of Sink a record in the DataSet is written to is dependent on the
> record’s properties, so we use a Map + Filter operation to just pull the
> desired records for the Sink. The latter portion of the graph looks like
> this:
>
>
>
> DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)
>
> DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism
> P-X)
>
>
>
> Parallelisms for the output into SinkA and SinkB are different than the
> parallelism used in the Map + Filter operation in order to control the
> resulting total number of output files. What I observe is that all of the
> records must first be sent to the Map + Filter operators, and only once
> after all records are received, the Sink begins to output records. This
> shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states
> while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet
> may contain billions of records, this ends up taking hours. Ideally, the
> records are streamed through to the Sink as they go through the Map +
> Filter.
>
>
>
> Is this blocking behavior due to the fact that the Map + Filter operators
> must re-distribute the records as they’re moving to an operator that has a
> lesser parallelism?
>
>
>
> ____________
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.gs.com_privacy-2Dnotices&d=DwMFAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=Jv2k2wNGkO1uo3rmHmVGS_JzHaIN5ImVWGCtAC-R2qw&s=Gle60a8I-K0ybZirKKcun1OZyYPFPZg1I-61NIgTFiw&e=>
>
>

Re: Understanding blocking behavior

Posted by "Jaffe, Julian" <Ju...@activision.com>.
Hey Andreas,

Have a read through https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained due to the rebalancing, if you execute your pipeline in batch mode downstream tasks will not begin processing data until the upstream tasks have finished all of their processing. If you can forgo the higher resiliency and lower resource requirements of executing in batch mode, you could try running your pipeline in streaming mode over bounded data.

Julian

From: "Hailu, Andreas [Engineering]" <An...@gs.com>
Date: Tuesday, February 16, 2021 at 2:00 PM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Understanding blocking behavior

Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks.

The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this:

DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)
DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X)

Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter.

Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism?

____________

Andreas Hailu
Data Lake Engineering | Goldman Sachs


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.gs.com_privacy-2Dnotices&d=DwMFAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=Jv2k2wNGkO1uo3rmHmVGS_JzHaIN5ImVWGCtAC-R2qw&s=Gle60a8I-K0ybZirKKcun1OZyYPFPZg1I-61NIgTFiw&e=>