You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reinier Kip <rk...@bol.com> on 2018/03/12 12:45:47 UTC

HDFS data locality and distribution

Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers.


Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier

Re: HDFS data locality and distribution

Posted by Reinier Kip <rk...@bol.com>.
Hi Chesnay,


Thanks for responding.


I managed to resolve the problem last Friday; I had a single datasource for each file, instead of one big datasource for all the files. The reading of the one or two HDFS blocks within each datasource was then distributed to a small percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for Flink I did not yet have.


> the function after the groupBy() should still make full use of the parallelism of the cluster
> Do note that data skew can affect how much data is distributed to each node


I do not remember seeing this behaviour, instead I remember data was redistributed only among slots that did the reading, but I cannot verify this at this point. Also, I do not know exactly how Beam operators map to Flink's. Key distribution is in the millions and quite uniform.


Reinier

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: 13 March 2018 12:40:02
To: user@flink.apache.org
Subject: Re: HDFS data locality and distribution

Hello,

You said that "data is distributed very badly across slots"; do you mean that only a small number of subtasks is reading from HDFS, or that the keyed data is only processed by a few subtasks?

Flink does prioritize date locality over date distribution when reading the files, but the function after the groupBy() should still make full use of the parallelism of the cluster. Do note that data skew can affect how much data is distributed to each node, i.e. if 80% of your data has the same key (or rather hash), they will all end up on the same node.

On 12.03.2018 13:49, Reinier Kip wrote:

Relevant versions: Beam 2.1, Flink 1.3.

________________________________
From: Reinier Kip <rk...@bol.com>
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers.


Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier


Re: HDFS data locality and distribution

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

You said that "data is distributed very badly across slots"; do you mean 
that only a small number of subtasks is reading from HDFS, or that the 
keyed data is only processed by a few subtasks?

Flink does prioritize date locality over date distribution when reading 
the files, but the function after the groupBy() should still make full 
use of the parallelism of the cluster. Do note that data skew can affect 
how much data is distributed to each node, i.e. if 80% of your data has 
the same key (or rather hash), they will all end up on the same node.

On 12.03.2018 13:49, Reinier Kip wrote:
>
> Relevant versions: Beam 2.1, Flink 1.3.
>
> ------------------------------------------------------------------------
> *From:* Reinier Kip <rk...@bol.com>
> *Sent:* 12 March 2018 13:45:47
> *To:* user@flink.apache.org
> *Subject:* HDFS data locality and distribution
>
> Hey all,
>
>
> I'm trying to batch-process 30-ish files from HDFS, but I see that 
> data is distributed very badly across slots. 4 out of 32 slots get 
> 4/5ths of the data, another 3 slots get about 1/5th and a last slot 
> just a few records. This probably triggers disk spillover on these 
> slots and slows down the job immensely. The data has many many unique 
> keys and processing could be done in a highly parallel manner. From 
> what I understand, HDFS data locality governs which splits are 
> assigned to which subtask.
>
>
>   * I'm running a Beam on Flink on YARN pipeline.
>   * I'm reading 30-ish files, whose records are later grouped by
>     their millions of unique keys.
>   * For now, I have 8 task managers by 4 slots. Beam sets all subtasks
>     to have 32 parallelism.
>   * Data seems to be localised to 9 out of the 32 slots, 3 out of the
>     8 task managers.
>
>
> Does the statement of input split assignment ring true? Is the fact 
> that data isn't redistributed an effort from Flink to have high data 
> locality, even if this means disk spillover for a few slots/tms and 
> idleness for others? Is there any use for parallelism if work isn't 
> distributed anyway?
>
>
> Thanks for your time, Reinier
>


Re: HDFS data locality and distribution

Posted by Reinier Kip <rk...@bol.com>.
Relevant versions: Beam 2.1, Flink 1.3.

________________________________
From: Reinier Kip <rk...@bol.com>
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers.


Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier