You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ShB <sh...@gmail.com> on 2017/08/30 22:24:06 UTC

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Hi Fabian,

Thank you so much for your quick response, I appreciate it. 

Since I'm working with a very large number of files of small sizes, I don't
necessarily need to read each file in parallel. 

I need to read a my large list of files in parallel - that is, split up my
list of files into smaller subsets and have each task manager read a subset
of them. 

I implemented it like this:
env.fromCollection(fileList).rebalance().flatMap(new ReadFiles());
where ReadFiles is a map function that reads each of the files from S3 using
the AWS S3 Java SDK and parses and emits each of the protobufs. 

Is this implementation an efficient way of solving this problem? 

Is there a more performant way of reading a large number of files from S3 in
a distributed manner, with perhaps env.readFile()?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

readFile() requests a FileInputFormat, i.e., your custom InputFormat would
need to extend FileInputFormat.
In general, any InputFormat decides about what to read when generating
InputSplits. In your case the, createInputSplits() method should return one
InputSplit for each file it wants to read.
By default, FileInputFormat creates one or more input splits for each file
in a directory. If you only want to read a subset of files (or have a list
of files to read), you should override the method and return exactly one
input split for each file to read (because your files cannot be read in
parallel).

If your InputFormat does not extend FileInputFormat, you can use
createInput() instead of readFile().

Best, Fabian

2017-08-31 21:24 GMT+02:00 ShB <sh...@gmail.com>:

> Hi Fabian,
>
> Thanks for your response.
>
> If I implemented my own InputFormat, how would I read a specific list of
> files from S3?
>
> Assuming I need to use readFile(), below would read all of the files from
> the specified S3 bucket or path:
> env.readFile(MyInputFormat, "s3://my-bucket/")
>
> Is there a way for me to read only a specific list/subset of files(say
> fileList) from a S3 bucket, in parallel using readFile?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Posted by ShB <sh...@gmail.com>.
Hi Fabian,

Thanks for your response. 

If I implemented my own InputFormat, how would I read a specific list of
files from S3?

Assuming I need to use readFile(), below would read all of the files from
the specified S3 bucket or path:
env.readFile(MyInputFormat, "s3://my-bucket/")

Is there a way for me to read only a specific list/subset of files(say
fileList) from a S3 bucket, in parallel using readFile? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

this is a valid approach.
It might suffer from unbalanced load if the reader tasks process the files
at different speed (or the files vary in size) because each task has to
process the same number of files.

An alternative would be to implement your own InputFormat.
The input format would create an InputSplit for each file to read.
At runtime, the JM fetches a list of all InputSplits and lazily distributes
them to running source tasks.
This automatically balances the load because faster source tasks will
process more splits than slower ones.

Best, Fabian

2017-08-31 0:24 GMT+02:00 ShB <sh...@gmail.com>:

> Hi Fabian,
>
> Thank you so much for your quick response, I appreciate it.
>
> Since I'm working with a very large number of files of small sizes, I don't
> necessarily need to read each file in parallel.
>
> I need to read a my large list of files in parallel - that is, split up my
> list of files into smaller subsets and have each task manager read a subset
> of them.
>
> I implemented it like this:
> env.fromCollection(fileList).rebalance().flatMap(new ReadFiles());
> where ReadFiles is a map function that reads each of the files from S3
> using
> the AWS S3 Java SDK and parses and emits each of the protobufs.
>
> Is this implementation an efficient way of solving this problem?
>
> Is there a more performant way of reading a large number of files from S3
> in
> a distributed manner, with perhaps env.readFile()?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>