You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2016/01/28 16:12:13 UTC

Writing Parquet files with Flink

Hi to all,

I was reading about optimal Parquet file size and HDFS block size.
The ideal situation for Parquet is when its block size (and thus the
maximum size of each row group) is equal to the HDFS block size. The
default behaviour of Flink is that the output file's size depends on the
output parallelism and thus I don't know how to achieve that.
Is that feasible?

Best,
Flavio

Re: Writing Parquet files with Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, make both block sizes the same and you're good.
I think you can neglect the overhead, unless we are not talking about
1000's of small files (smaller than block size).

2016-01-29 12:06 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> So there's no need to worry about the number of parquet files size from
> the Flink point of view if I set correctly the parquet block size (equal to
> the HDFS block size)...
> It only affects the Parquet file overhead (header and footer present in
> each file) and the HDFS resources required to handle them (one object for
> each HDFS file), right?
>
>
> On Fri, Jan 29, 2016 at 11:56 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> The number of input splits does not depend on the number of files but on
>> the number of HDFS blocks of all files.
>> Reading a single file with 100 HDFS blocks and reading of 100 files with
>> 1 block each should be divided into 100 input splits which can be read by
>> 100 tasks concurrently (or less tasks with lazy assignment).
>>
>> If you get less splits than HDFS blocks, you should check the
>> implementation of the getInputSplits() method in you InputFormat.
>>
>> Best, Fabian
>>
>> 2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks for the response!
>>> From what is my understanding (correct me if I'm wrong) once I produce
>>> some Parquet dir that I want to read later, the number of files in the dir
>>> affects the initial parallelism of the next job, i.e.:
>>>  - If I have less files than available tasks I will not fully exploit
>>> parallelism
>>>  - If the number of Parquet files is greater than the number of tasks
>>> they will read the files as soon as possible (at the maximum parallelism
>>> but depending on the speed of the pipeline)
>>>
>>> Having a single huge Parquet file could limit the performance of my
>>> Flink job because the default Hadoop IF can't exploit the parallelism at
>>> the datasource (because it relies only on the number of files found). To
>>> avoid that, I should write a custom ParquetInputFormat able to preprocess
>>> all parquet metadata in those files and extract the HDFS block to read and
>>> then generate the InputSplits. Am I right? Or am I misunderstanding
>>> something?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> using a default FileOutputFormat, Flink writes one output file for each
>>>> data sink task, i.e., as many files as the defined parallelism.
>>>> The size of these files depends on the total output size and the
>>>> distribution. If you write to HDFS, a file consists of one or more HDFS
>>>> blocks.
>>>> Parquet files are internally also organized in blocks. Each Parquet
>>>> block has a header with some meta information and data is organized and
>>>> compressed in a columnar fashion with a block. Due to this, the
>>>> ParquetInputFormat must always read a complete Parquet block.
>>>>
>>>> Flink's FileInputFormats split the input data along the HDFS blocks and
>>>> try to assign input splits such that blocks can be locally read. For best
>>>> performance, Parquet blocks should be aligned with HDFS blocks. It is not a
>>>> problem, if a Parquet block is not completely filled.
>>>>
>>>> If you want to control the size of the parallel output files, you would
>>>> need to know the total output size and choose the parallelism accordingly.
>>>> Flink is not able to infer the output size (depends on input size, task
>>>> semantics, data distribution, etc.), so it is up to you to choose the right
>>>> parallelism.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>>
>>>>> I was reading about optimal Parquet file size and HDFS block size.
>>>>> The ideal situation for Parquet is when its block size (and thus the
>>>>> maximum size of each row group) is equal to the HDFS block size. The
>>>>> default behaviour of Flink is that the output file's size depends on the
>>>>> output parallelism and thus I don't know how to achieve that.
>>>>> Is that feasible?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Writing Parquet files with Flink

Posted by Flavio Pompermaier <po...@okkam.it>.
So there's no need to worry about the number of parquet files size from the
Flink point of view if I set correctly the parquet block size (equal to the
HDFS block size)...
It only affects the Parquet file overhead (header and footer present in
each file) and the HDFS resources required to handle them (one object for
each HDFS file), right?

On Fri, Jan 29, 2016 at 11:56 AM, Fabian Hueske <fh...@gmail.com> wrote:

> The number of input splits does not depend on the number of files but on
> the number of HDFS blocks of all files.
> Reading a single file with 100 HDFS blocks and reading of 100 files with 1
> block each should be divided into 100 input splits which can be read by 100
> tasks concurrently (or less tasks with lazy assignment).
>
> If you get less splits than HDFS blocks, you should check the
> implementation of the getInputSplits() method in you InputFormat.
>
> Best, Fabian
>
> 2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi Fabian,
>> thanks for the response!
>> From what is my understanding (correct me if I'm wrong) once I produce
>> some Parquet dir that I want to read later, the number of files in the dir
>> affects the initial parallelism of the next job, i.e.:
>>  - If I have less files than available tasks I will not fully exploit
>> parallelism
>>  - If the number of Parquet files is greater than the number of tasks
>> they will read the files as soon as possible (at the maximum parallelism
>> but depending on the speed of the pipeline)
>>
>> Having a single huge Parquet file could limit the performance of my Flink
>> job because the default Hadoop IF can't exploit the parallelism at the
>> datasource (because it relies only on the number of files found). To avoid
>> that, I should write a custom ParquetInputFormat able to preprocess all
>> parquet metadata in those files and extract the HDFS block to read and then
>> generate the InputSplits. Am I right? Or am I misunderstanding something?
>>
>> Best,
>> Flavio
>>
>> On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> using a default FileOutputFormat, Flink writes one output file for each
>>> data sink task, i.e., as many files as the defined parallelism.
>>> The size of these files depends on the total output size and the
>>> distribution. If you write to HDFS, a file consists of one or more HDFS
>>> blocks.
>>> Parquet files are internally also organized in blocks. Each Parquet
>>> block has a header with some meta information and data is organized and
>>> compressed in a columnar fashion with a block. Due to this, the
>>> ParquetInputFormat must always read a complete Parquet block.
>>>
>>> Flink's FileInputFormats split the input data along the HDFS blocks and
>>> try to assign input splits such that blocks can be locally read. For best
>>> performance, Parquet blocks should be aligned with HDFS blocks. It is not a
>>> problem, if a Parquet block is not completely filled.
>>>
>>> If you want to control the size of the parallel output files, you would
>>> need to know the total output size and choose the parallelism accordingly.
>>> Flink is not able to infer the output size (depends on input size, task
>>> semantics, data distribution, etc.), so it is up to you to choose the right
>>> parallelism.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Hi to all,
>>>>
>>>> I was reading about optimal Parquet file size and HDFS block size.
>>>> The ideal situation for Parquet is when its block size (and thus the
>>>> maximum size of each row group) is equal to the HDFS block size. The
>>>> default behaviour of Flink is that the output file's size depends on the
>>>> output parallelism and thus I don't know how to achieve that.
>>>> Is that feasible?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>
>

Re: Writing Parquet files with Flink

Posted by Fabian Hueske <fh...@gmail.com>.
The number of input splits does not depend on the number of files but on
the number of HDFS blocks of all files.
Reading a single file with 100 HDFS blocks and reading of 100 files with 1
block each should be divided into 100 input splits which can be read by 100
tasks concurrently (or less tasks with lazy assignment).

If you get less splits than HDFS blocks, you should check the
implementation of the getInputSplits() method in you InputFormat.

Best, Fabian

2016-01-29 11:49 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Hi Fabian,
> thanks for the response!
> From what is my understanding (correct me if I'm wrong) once I produce
> some Parquet dir that I want to read later, the number of files in the dir
> affects the initial parallelism of the next job, i.e.:
>  - If I have less files than available tasks I will not fully exploit
> parallelism
>  - If the number of Parquet files is greater than the number of tasks they
> will read the files as soon as possible (at the maximum parallelism but
> depending on the speed of the pipeline)
>
> Having a single huge Parquet file could limit the performance of my Flink
> job because the default Hadoop IF can't exploit the parallelism at the
> datasource (because it relies only on the number of files found). To avoid
> that, I should write a custom ParquetInputFormat able to preprocess all
> parquet metadata in those files and extract the HDFS block to read and then
> generate the InputSplits. Am I right? Or am I misunderstanding something?
>
> Best,
> Flavio
>
> On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> using a default FileOutputFormat, Flink writes one output file for each
>> data sink task, i.e., as many files as the defined parallelism.
>> The size of these files depends on the total output size and the
>> distribution. If you write to HDFS, a file consists of one or more HDFS
>> blocks.
>> Parquet files are internally also organized in blocks. Each Parquet block
>> has a header with some meta information and data is organized and
>> compressed in a columnar fashion with a block. Due to this, the
>> ParquetInputFormat must always read a complete Parquet block.
>>
>> Flink's FileInputFormats split the input data along the HDFS blocks and
>> try to assign input splits such that blocks can be locally read. For best
>> performance, Parquet blocks should be aligned with HDFS blocks. It is not a
>> problem, if a Parquet block is not completely filled.
>>
>> If you want to control the size of the parallel output files, you would
>> need to know the total output size and choose the parallelism accordingly.
>> Flink is not able to infer the output size (depends on input size, task
>> semantics, data distribution, etc.), so it is up to you to choose the right
>> parallelism.
>>
>> Best, Fabian
>>
>>
>> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi to all,
>>>
>>> I was reading about optimal Parquet file size and HDFS block size.
>>> The ideal situation for Parquet is when its block size (and thus the
>>> maximum size of each row group) is equal to the HDFS block size. The
>>> default behaviour of Flink is that the output file's size depends on the
>>> output parallelism and thus I don't know how to achieve that.
>>> Is that feasible?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>

Re: Writing Parquet files with Flink

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
thanks for the response!
>From what is my understanding (correct me if I'm wrong) once I produce some
Parquet dir that I want to read later, the number of files in the dir
affects the initial parallelism of the next job, i.e.:
 - If I have less files than available tasks I will not fully exploit
parallelism
 - If the number of Parquet files is greater than the number of tasks they
will read the files as soon as possible (at the maximum parallelism but
depending on the speed of the pipeline)

Having a single huge Parquet file could limit the performance of my Flink
job because the default Hadoop IF can't exploit the parallelism at the
datasource (because it relies only on the number of files found). To avoid
that, I should write a custom ParquetInputFormat able to preprocess all
parquet metadata in those files and extract the HDFS block to read and then
generate the InputSplits. Am I right? Or am I misunderstanding something?

Best,
Flavio

On Fri, Jan 29, 2016 at 11:14 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Flavio,
>
> using a default FileOutputFormat, Flink writes one output file for each
> data sink task, i.e., as many files as the defined parallelism.
> The size of these files depends on the total output size and the
> distribution. If you write to HDFS, a file consists of one or more HDFS
> blocks.
> Parquet files are internally also organized in blocks. Each Parquet block
> has a header with some meta information and data is organized and
> compressed in a columnar fashion with a block. Due to this, the
> ParquetInputFormat must always read a complete Parquet block.
>
> Flink's FileInputFormats split the input data along the HDFS blocks and
> try to assign input splits such that blocks can be locally read. For best
> performance, Parquet blocks should be aligned with HDFS blocks. It is not a
> problem, if a Parquet block is not completely filled.
>
> If you want to control the size of the parallel output files, you would
> need to know the total output size and choose the parallelism accordingly.
> Flink is not able to infer the output size (depends on input size, task
> semantics, data distribution, etc.), so it is up to you to choose the right
> parallelism.
>
> Best, Fabian
>
>
> 2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi to all,
>>
>> I was reading about optimal Parquet file size and HDFS block size.
>> The ideal situation for Parquet is when its block size (and thus the
>> maximum size of each row group) is equal to the HDFS block size. The
>> default behaviour of Flink is that the output file's size depends on the
>> output parallelism and thus I don't know how to achieve that.
>> Is that feasible?
>>
>> Best,
>> Flavio
>>
>
>

Re: Writing Parquet files with Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

using a default FileOutputFormat, Flink writes one output file for each
data sink task, i.e., as many files as the defined parallelism.
The size of these files depends on the total output size and the
distribution. If you write to HDFS, a file consists of one or more HDFS
blocks.
Parquet files are internally also organized in blocks. Each Parquet block
has a header with some meta information and data is organized and
compressed in a columnar fashion with a block. Due to this, the
ParquetInputFormat must always read a complete Parquet block.

Flink's FileInputFormats split the input data along the HDFS blocks and try
to assign input splits such that blocks can be locally read. For best
performance, Parquet blocks should be aligned with HDFS blocks. It is not a
problem, if a Parquet block is not completely filled.

If you want to control the size of the parallel output files, you would
need to know the total output size and choose the parallelism accordingly.
Flink is not able to infer the output size (depends on input size, task
semantics, data distribution, etc.), so it is up to you to choose the right
parallelism.

Best, Fabian


2016-01-28 16:12 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Hi to all,
>
> I was reading about optimal Parquet file size and HDFS block size.
> The ideal situation for Parquet is when its block size (and thus the
> maximum size of each row group) is equal to the HDFS block size. The
> default behaviour of Flink is that the output file's size depends on the
> output parallelism and thus I don't know how to achieve that.
> Is that feasible?
>
> Best,
> Flavio
>