You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Palle <pa...@sport.dk> on 2016/05/07 10:12:55 UTC

How to make Flink read all files in HDFS folder and do transformations on th e data

Hi there.

I've got a HDFS folder containing a lot of files. All files contains a lot of JSON objects, one for each line. I will have several TB in the HDFS folder.

My plan is to make Flink read all files and all JSON objects and then do some analysis on the data, actually very similar to the flatMap/groupBy/reduceGroup transformations that is done in the WordCount example.

But I am a bit stuck, because I cannot seem to find out how to make Flink read all files in a HDFS dir and then perform the transformations on the data. I have googled quite a bit and also looked in the Flink API and mail history.

Can anyone point me to an example where Flink is used to read all files in a HDFS folder and then do transformations on the data)?

- and a second question: Is there an elegant way to make Flink handle the JSON objects? - can they be converted to POJOs by something similar to the pojoType() method?

/Palle

Re: How to make Flink read all files in HDFS folder and do transformations on th e data

Posted by Flavio Pompermaier <po...@okkam.it>.
Sorry Palle,
I wrongly understood that you were trying to read a single json object per
file...the solution suggested by Fabian is definitely the right solution
for your specific use case!

Best,
Flavio
On 7 May 2016 12:52, "Fabian Hueske" <fh...@gmail.com> wrote:

> Hi Palle,
>
> you can recursively read all files in a folder as explained in the
> "Recursive Traversal of the Input Path Directory" section of the Data
> Source documentation [1].
>
> The easiest way to read line-wise JSON objects is to use
> ExecutionEnvironment.readTextFile() which reads text files linewise as
> strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to
> parse the JSON strings. You should use a RichMapFunction and create the
> parser in the open() method to avoid instantiating a new parser for each
> incoming line. After parsing, the RichMapFunction can emit POJOs.
>
> Cheers, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources
>
> 2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> I had the same issue :)
>> I resolved it reading all file paths in a collection, then using this
>> code:
>>
>> env.fromCollection(filePaths).rebalance().map(file2pojo)
>>
>> You can have your dataset of Pojos!
>>
>> The rebalance() is necessary to exploit parallelism,otherwise the
>> pipeline will be executed with parallelism 1.
>>
>> Best,
>> Flavio
>> On 7 May 2016 12:13, "Palle" <pa...@sport.dk> wrote:
>>
>> Hi there.
>>
>> I've got a HDFS folder containing a lot of files. All files contains a
>> lot of JSON objects, one for each line. I will have several TB in the HDFS
>> folder.
>>
>> My plan is to make Flink read all files and all JSON objects and then do
>> some analysis on the data, actually very similar to the
>> flatMap/groupBy/reduceGroup transformations that is done in the WordCount
>> example.
>>
>> But I am a bit stuck, because I cannot seem to find out how to make Flink
>> read all files in a HDFS dir and then perform the transformations on the
>> data. I have googled quite a bit and also looked in the Flink API and mail
>> history.
>>
>> Can anyone point me to an example where Flink is used to read all files
>> in a HDFS folder and then do transformations on the data)?
>>
>> - and a second question: Is there an elegant way to make Flink handle the
>> JSON objects? - can they be converted to POJOs by something similar to the
>> pojoType() method?
>>
>> /Palle
>>
>>
>

Re: How to make Flink read all files in HDFS folder and do transformations on th e data

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Palle,

you can recursively read all files in a folder as explained in the
"Recursive Traversal of the Input Path Directory" section of the Data
Source documentation [1].

The easiest way to read line-wise JSON objects is to use
ExecutionEnvironment.readTextFile() which reads text files linewise as
strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to
parse the JSON strings. You should use a RichMapFunction and create the
parser in the open() method to avoid instantiating a new parser for each
incoming line. After parsing, the RichMapFunction can emit POJOs.

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources

2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> I had the same issue :)
> I resolved it reading all file paths in a collection, then using this code:
>
> env.fromCollection(filePaths).rebalance().map(file2pojo)
>
> You can have your dataset of Pojos!
>
> The rebalance() is necessary to exploit parallelism,otherwise the pipeline
> will be executed with parallelism 1.
>
> Best,
> Flavio
> On 7 May 2016 12:13, "Palle" <pa...@sport.dk> wrote:
>
> Hi there.
>
> I've got a HDFS folder containing a lot of files. All files contains a lot
> of JSON objects, one for each line. I will have several TB in the HDFS
> folder.
>
> My plan is to make Flink read all files and all JSON objects and then do
> some analysis on the data, actually very similar to the
> flatMap/groupBy/reduceGroup transformations that is done in the WordCount
> example.
>
> But I am a bit stuck, because I cannot seem to find out how to make Flink
> read all files in a HDFS dir and then perform the transformations on the
> data. I have googled quite a bit and also looked in the Flink API and mail
> history.
>
> Can anyone point me to an example where Flink is used to read all files in
> a HDFS folder and then do transformations on the data)?
>
> - and a second question: Is there an elegant way to make Flink handle the
> JSON objects? - can they be converted to POJOs by something similar to the
> pojoType() method?
>
> /Palle
>
>

Re: How to make Flink read all files in HDFS folder and do transformations on th e data

Posted by Flavio Pompermaier <po...@okkam.it>.
I had the same issue :)
I resolved it reading all file paths in a collection, then using this code:

env.fromCollection(filePaths).rebalance().map(file2pojo)

You can have your dataset of Pojos!

The rebalance() is necessary to exploit parallelism,otherwise the pipeline
will be executed with parallelism 1.

Best,
Flavio
On 7 May 2016 12:13, "Palle" <pa...@sport.dk> wrote:

Hi there.

I've got a HDFS folder containing a lot of files. All files contains a lot
of JSON objects, one for each line. I will have several TB in the HDFS
folder.

My plan is to make Flink read all files and all JSON objects and then do
some analysis on the data, actually very similar to the
flatMap/groupBy/reduceGroup transformations that is done in the WordCount
example.

But I am a bit stuck, because I cannot seem to find out how to make Flink
read all files in a HDFS dir and then perform the transformations on the
data. I have googled quite a bit and also looked in the Flink API and mail
history.

Can anyone point me to an example where Flink is used to read all files in
a HDFS folder and then do transformations on the data)?

- and a second question: Is there an elegant way to make Flink handle the
JSON objects? - can they be converted to POJOs by something similar to the
pojoType() method?

/Palle