You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nikola Hrusov <n....@gmail.com> on 2020/05/28 14:52:32 UTC

Streaming multiple csv files

Hello,

I have multiple files (file1, file2, file3) each being CSV and having
different columns and data. The column headers are finite and we know
their format. I would like to take them and parse them based on the column
structure. I already have the parsers

e.g.:

file1 has columns (id, firstname, lastname)
file2 has columns (id, name)
file3 has columns (id, name_1, name_2, name_3, name_4)

I would like to take all those files, read them, parse them and output
objects to a sink as Person { id, fullName }

Example files would be:

file1:
------
id, firstname, lastname
33, John, Smith
55, Labe, Soni

file2:
------
id, name
5, Mitr Kompi
99, Squi Masw

file3:
------
id, name_1, name_2, name_3, name_4
1, Peter, Hov, Risti, Pena
2, Rii, Koni, Ques,,

Expected output of my program would be:

Person { 33, John Smith }
Person { 55, Labe Soni }
Person { 5, Mitr Kompi }
Person { 99, Squi Masw }
Person { 1, Peter Hov Risti Pena }
Person { 2, Rii Koni Ques }



What I do now is:

My code (very simplified) is: env.readFile().flatMap(new
MyParser()).addSink(new MySink())
The MyParser receives the rows 1 by 1 in string format. Which means that
when I run with parallelism > 1 I receive data from any file and I cannot
say this line comes from where.



What I would like to do is:

Be able to figure out which is the file I am reading from.
Since I only know the file type based on the first row (columns) I need to
either send the 1st row to MyParser() or send a tuple <1st row of file
being read, current row of file being read>.
Another option that I can think about is to have some keyed function based
on the first row, but I am not sure how to achieve that by using readFile.


Is there a way I can achieve this?


Regards
,
Nikola

Re: Streaming multiple csv files

Posted by Robert Metzger <rm...@apache.org>.
Hi Nikola,

you could implement a custom SourceFunction that implements this in some
way: If the files are small (< 10 MB) send each file as a record, then
process it in a subsequent flatMap operation. If the files are large, split
the work across the parallel sources and read them serially in the
SourceFunction.

The other option (which I have not fully thought through is using the
readFile method with a custom FileInputFormat implementation:

DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath)

You basically have to overwrite the
"FileInputFormat.createInputSplits()" method to get the CSV schema,
and pass it along to the splits so that they can properly parse the
data.
This approach is a little bit more involved to understand, but the
Flink Framework will do the heavy lifting of the file system handling
/ splitting / fault tolerance stuff.

Best,

Robert


On Thu, May 28, 2020 at 4:52 PM Nikola Hrusov <n....@gmail.com> wrote:

> Hello,
>
> I have multiple files (file1, file2, file3) each being CSV and having
> different columns and data. The column headers are finite and we know
> their format. I would like to take them and parse them based on the column
> structure. I already have the parsers
>
> e.g.:
>
> file1 has columns (id, firstname, lastname)
> file2 has columns (id, name)
> file3 has columns (id, name_1, name_2, name_3, name_4)
>
> I would like to take all those files, read them, parse them and output
> objects to a sink as Person { id, fullName }
>
> Example files would be:
>
> file1:
> ------
> id, firstname, lastname
> 33, John, Smith
> 55, Labe, Soni
>
> file2:
> ------
> id, name
> 5, Mitr Kompi
> 99, Squi Masw
>
> file3:
> ------
> id, name_1, name_2, name_3, name_4
> 1, Peter, Hov, Risti, Pena
> 2, Rii, Koni, Ques,,
>
> Expected output of my program would be:
>
> Person { 33, John Smith }
> Person { 55, Labe Soni }
> Person { 5, Mitr Kompi }
> Person { 99, Squi Masw }
> Person { 1, Peter Hov Risti Pena }
> Person { 2, Rii Koni Ques }
>
>
>
> What I do now is:
>
> My code (very simplified) is: env.readFile().flatMap(new
> MyParser()).addSink(new MySink())
> The MyParser receives the rows 1 by 1 in string format. Which means that
> when I run with parallelism > 1 I receive data from any file and I cannot
> say this line comes from where.
>
>
>
> What I would like to do is:
>
> Be able to figure out which is the file I am reading from.
> Since I only know the file type based on the first row (columns) I need to
> either send the 1st row to MyParser() or send a tuple <1st row of file
> being read, current row of file being read>.
> Another option that I can think about is to have some keyed function based
> on the first row, but I am not sure how to achieve that by using readFile.
>
>
> Is there a way I can achieve this?
>
>
> Regards
> ,
> Nikola
>