You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2020/04/23 08:13:26 UTC

Re: Reading a single input file in parallel?

Hi,

I wanted to spend some time on this idea I asked about a year ago.

So I had a look at the actual code and found this where the file splits are
calculated


https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601

What I realized is that apparently the current system for determining the
FileInputSplit s
1) Lists  the collection of input files.
2) If a file is NOT splittable set a global flag to false.
3) ONLY if all of them are splittable then will they be split.

So (if I understand correctly) if I have an input directory with a 100GB
uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split.
Why is that done this way?

I also found that the determination if a file is NOT splittable completely
rests on the fact that the decompression that is to be used is to be
provided by an InflaterInputStreamFactory.
Which is funny because the BZIP2 as a compression IS splittable but
according to this implementation it isn't.

I also noticed that the Hadoop split calculation is controlled by a split
size, the Flink implementation is controlled by the number of splits.
This seems logical to me as in Hadoop (MapReduce) the number of tasks
running is dynamic. In contrast with Flink where the number of parallel
tasks is a given setting.

Looking at how Hadoop does this I see that the FileInputFormat has a
method isSplitable
(yes with typo, and with a terribly dangerous default value: MAPREDUCE-2094)
which gives the answer on a per file basis.
Some file formats (like TextInputFormat) look at the codec that was found
to see if it implements the SplittableCompressionCodec interface to
determine if the file is splittable.
Other file formats (like Avro and Parquet) use something else to determine
this.

Overall I currently think the way this has been implemented in Flink is not
very good.

However looking at the gap to bridge to make it similar to what Hadoop has
seems like a huge step.

I expect that many classes and interfaces will need to change dramatically
to make this happen.

My main question to you guys: Do you think it is worth it?

Niels Basjes


On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Niels,
>
> Jörn is right, although offering different methods, Flink's InputFormat is
> very similar to Hadoop's InputFormat interface.
> The InputFormat.createInputSplits() method generates splits that can be
> read in parallel.
> The FileInputFormat splits files by fixed boundaries (usually HDFS
> blocksize) and expects the InputFormat to find the right place to start and
> end.
> For line-wise read files (TextInputFormat) or files with a record delimiter
> (DelimiterInputFormat), the formats read the first record after they found
> the first delimiter in their split and stop at the first delimiter after
> the split boundary.
> The BinaryInputFormat extends FileInputFormat but overrides the
> createInputSplits method.
>
> So, how exactly a file is read in parallel depends on the
> createInputSplits() method of the InputFormat.
>
> Hope this helps,
> Fabian
>
>
> 2018-02-18 13:36 GMT+01:00 Jörn Franke <jo...@gmail.com>:
>
> > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you
> > can set for custom Fileibputformats the attribute unsplittable = true if
> > your file format cannot be split
> >
> > > On 18. Feb 2018, at 13:28, Niels Basjes <Ni...@basjes.nl> wrote:
> > >
> > > Hi,
> > >
> > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > FileInputFormat. This has the effect that a single input file can be
> fed
> > > into multiple separate instances of the mapper that read the data.
> > > A lot has been documented (i.e. text is splittable per line, gzipped
> text
> > > is not splittable) and designed into the various file formats (like
> Avro
> > > and Parquet) to allow splittability.
> > >
> > > The goal is that reading and parsing files can be done by multiple
> > > cpus/systems in parallel.
> > >
> > > How is this handled in Flink?
> > > Can Flink read a single file in parallel?
> > > How does Flink administrate/handle the possibilities regarding the
> > various
> > > file formats?
> > >
> > >
> > > The reason I ask is because I want to see if I can port this (now
> Hadoop
> > > specific) hobby project of mine to work with Flink:
> > > https://github.com/nielsbasjes/splittablegzip
> > >
> > > Thanks.
> > >
> > > --
> > > Best regards / Met vriendelijke groeten,
> > >
> > > Niels Basjes
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Reading a single input file in parallel?

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

Yes, you are correct. Flink does use a minSplitSize to determine the
splits. I missed that part.
Also this is the part I do not intend to change.

In this I would focus on essentially:
- "is splittable" is decided per file --> Similar to Hadoop: Both a codec
and fileformat get a "isSplittable" method/marker that let's this to be
decided per file at runtime. This should be the 'same' construct for all
file formats (so also for Text, Avro, Parquet, etc.). Also if I create a
new file format that for something that does not support splitting (like
some specific datastructures using for example XML) then that should be
cleanly possible.
- The codecs must be overridable. For this
https://github.com/nielsbasjes/splittablegzip to work the default gzip
decompressor must be disabled. The current setup seems to make this
possible but I'm not sure:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118


I intend to leave the way the splits are calculated to be as-is.

Niels



On Thu, Apr 23, 2020 at 11:33 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi Niels,
>
> Thanks for start this discussion. Share some thought about your
> questions/proposals.
>
> > Judge whether splittable for each individual file
> Looks good to me.
>
> > BZIP2 support splittable
> Looks good to me.
>
> > the Flink implementation is controlled by the number of splits
> Can you check again? I think Flink is also affected by block size [1].
>
> > Looking at how Hadoop does this I see that the FileInputFormat has a
> method isSplitable
> Now Flink do this in FileInputFormat, but also can do this like Hadoop
>
> I can see the split strategy in Hadoop orc and parquet are quite complex, I
> don't have a lot of in-depth research, but I think these implementations
> should be meaningful.
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <Ni...@basjes.nl> wrote:
>
> > Hi,
> >
> > I wanted to spend some time on this idea I asked about a year ago.
> >
> > So I had a look at the actual code and found this where the file splits
> are
> > calculated
> >
> >
> >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601
> >
> > What I realized is that apparently the current system for determining the
> > FileInputSplit s
> > 1) Lists  the collection of input files.
> > 2) If a file is NOT splittable set a global flag to false.
> > 3) ONLY if all of them are splittable then will they be split.
> >
> > So (if I understand correctly) if I have an input directory with a 100GB
> > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be
> split.
> > Why is that done this way?
> >
> > I also found that the determination if a file is NOT splittable
> completely
> > rests on the fact that the decompression that is to be used is to be
> > provided by an InflaterInputStreamFactory.
> > Which is funny because the BZIP2 as a compression IS splittable but
> > according to this implementation it isn't.
> >
> > I also noticed that the Hadoop split calculation is controlled by a split
> > size, the Flink implementation is controlled by the number of splits.
> > This seems logical to me as in Hadoop (MapReduce) the number of tasks
> > running is dynamic. In contrast with Flink where the number of parallel
> > tasks is a given setting.
> >
> > Looking at how Hadoop does this I see that the FileInputFormat has a
> > method isSplitable
> > (yes with typo, and with a terribly dangerous default value:
> > MAPREDUCE-2094)
> > which gives the answer on a per file basis.
> > Some file formats (like TextInputFormat) look at the codec that was found
> > to see if it implements the SplittableCompressionCodec interface to
> > determine if the file is splittable.
> > Other file formats (like Avro and Parquet) use something else to
> determine
> > this.
> >
> > Overall I currently think the way this has been implemented in Flink is
> not
> > very good.
> >
> > However looking at the gap to bridge to make it similar to what Hadoop
> has
> > seems like a huge step.
> >
> > I expect that many classes and interfaces will need to change
> dramatically
> > to make this happen.
> >
> > My main question to you guys: Do you think it is worth it?
> >
> > Niels Basjes
> >
> >
> > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Hi Niels,
> > >
> > > Jörn is right, although offering different methods, Flink's InputFormat
> > is
> > > very similar to Hadoop's InputFormat interface.
> > > The InputFormat.createInputSplits() method generates splits that can be
> > > read in parallel.
> > > The FileInputFormat splits files by fixed boundaries (usually HDFS
> > > blocksize) and expects the InputFormat to find the right place to start
> > and
> > > end.
> > > For line-wise read files (TextInputFormat) or files with a record
> > delimiter
> > > (DelimiterInputFormat), the formats read the first record after they
> > found
> > > the first delimiter in their split and stop at the first delimiter
> after
> > > the split boundary.
> > > The BinaryInputFormat extends FileInputFormat but overrides the
> > > createInputSplits method.
> > >
> > > So, how exactly a file is read in parallel depends on the
> > > createInputSplits() method of the InputFormat.
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jo...@gmail.com>:
> > >
> > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore
> > you
> > > > can set for custom Fileibputformats the attribute unsplittable = true
> > if
> > > > your file format cannot be split
> > > >
> > > > > On 18. Feb 2018, at 13:28, Niels Basjes <Ni...@basjes.nl> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > > > FileInputFormat. This has the effect that a single input file can
> be
> > > fed
> > > > > into multiple separate instances of the mapper that read the data.
> > > > > A lot has been documented (i.e. text is splittable per line,
> gzipped
> > > text
> > > > > is not splittable) and designed into the various file formats (like
> > > Avro
> > > > > and Parquet) to allow splittability.
> > > > >
> > > > > The goal is that reading and parsing files can be done by multiple
> > > > > cpus/systems in parallel.
> > > > >
> > > > > How is this handled in Flink?
> > > > > Can Flink read a single file in parallel?
> > > > > How does Flink administrate/handle the possibilities regarding the
> > > > various
> > > > > file formats?
> > > > >
> > > > >
> > > > > The reason I ask is because I want to see if I can port this (now
> > > Hadoop
> > > > > specific) hobby project of mine to work with Flink:
> > > > > https://github.com/nielsbasjes/splittablegzip
> > > > >
> > > > > Thanks.
> > > > >
> > > > > --
> > > > > Best regards / Met vriendelijke groeten,
> > > > >
> > > > > Niels Basjes
> > > >
> > >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Reading a single input file in parallel?

Posted by Jingsong Li <ji...@gmail.com>.
Hi Niels,

Thanks for start this discussion. Share some thought about your
questions/proposals.

> Judge whether splittable for each individual file
Looks good to me.

> BZIP2 support splittable
Looks good to me.

> the Flink implementation is controlled by the number of splits
Can you check again? I think Flink is also affected by block size [1].

> Looking at how Hadoop does this I see that the FileInputFormat has a
method isSplitable
Now Flink do this in FileInputFormat, but also can do this like Hadoop

I can see the split strategy in Hadoop orc and parquet are quite complex, I
don't have a lot of in-depth research, but I think these implementations
should be meaningful.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644

Best,
Jingsong Lee

On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I wanted to spend some time on this idea I asked about a year ago.
>
> So I had a look at the actual code and found this where the file splits are
> calculated
>
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601
>
> What I realized is that apparently the current system for determining the
> FileInputSplit s
> 1) Lists  the collection of input files.
> 2) If a file is NOT splittable set a global flag to false.
> 3) ONLY if all of them are splittable then will they be split.
>
> So (if I understand correctly) if I have an input directory with a 100GB
> uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split.
> Why is that done this way?
>
> I also found that the determination if a file is NOT splittable completely
> rests on the fact that the decompression that is to be used is to be
> provided by an InflaterInputStreamFactory.
> Which is funny because the BZIP2 as a compression IS splittable but
> according to this implementation it isn't.
>
> I also noticed that the Hadoop split calculation is controlled by a split
> size, the Flink implementation is controlled by the number of splits.
> This seems logical to me as in Hadoop (MapReduce) the number of tasks
> running is dynamic. In contrast with Flink where the number of parallel
> tasks is a given setting.
>
> Looking at how Hadoop does this I see that the FileInputFormat has a
> method isSplitable
> (yes with typo, and with a terribly dangerous default value:
> MAPREDUCE-2094)
> which gives the answer on a per file basis.
> Some file formats (like TextInputFormat) look at the codec that was found
> to see if it implements the SplittableCompressionCodec interface to
> determine if the file is splittable.
> Other file formats (like Avro and Parquet) use something else to determine
> this.
>
> Overall I currently think the way this has been implemented in Flink is not
> very good.
>
> However looking at the gap to bridge to make it similar to what Hadoop has
> seems like a huge step.
>
> I expect that many classes and interfaces will need to change dramatically
> to make this happen.
>
> My main question to you guys: Do you think it is worth it?
>
> Niels Basjes
>
>
> On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi Niels,
> >
> > Jörn is right, although offering different methods, Flink's InputFormat
> is
> > very similar to Hadoop's InputFormat interface.
> > The InputFormat.createInputSplits() method generates splits that can be
> > read in parallel.
> > The FileInputFormat splits files by fixed boundaries (usually HDFS
> > blocksize) and expects the InputFormat to find the right place to start
> and
> > end.
> > For line-wise read files (TextInputFormat) or files with a record
> delimiter
> > (DelimiterInputFormat), the formats read the first record after they
> found
> > the first delimiter in their split and stop at the first delimiter after
> > the split boundary.
> > The BinaryInputFormat extends FileInputFormat but overrides the
> > createInputSplits method.
> >
> > So, how exactly a file is read in parallel depends on the
> > createInputSplits() method of the InputFormat.
> >
> > Hope this helps,
> > Fabian
> >
> >
> > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jo...@gmail.com>:
> >
> > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore
> you
> > > can set for custom Fileibputformats the attribute unsplittable = true
> if
> > > your file format cannot be split
> > >
> > > > On 18. Feb 2018, at 13:28, Niels Basjes <Ni...@basjes.nl> wrote:
> > > >
> > > > Hi,
> > > >
> > > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > > FileInputFormat. This has the effect that a single input file can be
> > fed
> > > > into multiple separate instances of the mapper that read the data.
> > > > A lot has been documented (i.e. text is splittable per line, gzipped
> > text
> > > > is not splittable) and designed into the various file formats (like
> > Avro
> > > > and Parquet) to allow splittability.
> > > >
> > > > The goal is that reading and parsing files can be done by multiple
> > > > cpus/systems in parallel.
> > > >
> > > > How is this handled in Flink?
> > > > Can Flink read a single file in parallel?
> > > > How does Flink administrate/handle the possibilities regarding the
> > > various
> > > > file formats?
> > > >
> > > >
> > > > The reason I ask is because I want to see if I can port this (now
> > Hadoop
> > > > specific) hobby project of mine to work with Flink:
> > > > https://github.com/nielsbasjes/splittablegzip
> > > >
> > > > Thanks.
> > > >
> > > > --
> > > > Best regards / Met vriendelijke groeten,
> > > >
> > > > Niels Basjes
> > >
> >
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best, Jingsong Lee