You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Esa Heikkinen <es...@student.tut.fi> on 2018/05/07 10:44:26 UTC

Reading csv-files in parallel

Hi

I would want to read many different type csv-files (time series data) parallel using by CsvTableSource. Is that possible in Flink application ? If yes, are there exist the examples about that ?

If it is not, do you have any advices how to do that ?

Should I combine all csv-files to one csv-file in pre-processing phase ? But this has little problem, because there are not same type (columns are different, except timestamp-column).

Best, Esa


Re: Reading csv-files in parallel

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

this looks roughly as below

----

val env = ExecutionEnvironment.getExecutionEnvironment()



val ds: DataSet[…] = env

  .readTextFile(path)

  .map(yourCsvLineParser)



val tableEnv = TableEnvironment.getTableEnvironment(env)



tableEnv.registerDataSet("myTable", ds)

val result = tableEnv.sqlQuery("SELECT …. FROM myTable ….")

----

Best Fabian

2018-05-09 15:09 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:

> Hi
>
>
>
> Sorry the stupid question, but how to connect readTextFile (or
> readCsvFile), MapFunction and SQL together in Scala code ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Tuesday, May 8, 2018 10:26 PM
>
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi,
>
> the Table API / SQL and the DataSet API can be used together in the same
> program.
>
> So you could read the data with a custom input format or a TextInputFormat
> and a custom MapFunction parser and hand it to SQL afterwards.
>
> The program would be a regular Scala DataSet program with an
> ExecutionEnvironment as in the examples or the documenation.
>
> To read many different files, you can put them all in a single folder and
> scan the whole folder. If you run on the master, you can try the new
> multi-path feature of FileInputFormats. Alterantively you can add many
> sources and use a union operator to union all data sets.
>
> Best, Fabian
>
>
>
> 2018-05-08 15:49 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> Would it better to use DataSet API, Table (Relational) and readCsvFile() ,
> because it is little but upper level implementation ?
>
>
>
> SQL also sounds very good in this (batch processing) case, but is it
> possible to use (because many different type of csv-files) ?
>
> And does it understand timeseries-data ?
>
>
>
> By the way, how to the control flow is running in main (Scala) program and
> what is the structure of main program ?
>
> I did mean, if I want to read many csv-files and I have certain
> consecutive reading order of them. Is that possible and how ?
>
>
>
> Actually I want to implement upper level (state-machine-based) logic for
> reading csv-files by certain order.
>
>
>
> Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Tuesday, May 8, 2018 2:00 PM
>
>
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi,
>
> the easiest approach is to read the CSV files linewise as regular text
> files (ExecutionEnvironment.readTextFile()) and apply custom parse logic
> in a MapFunction.
>
> Then you have all freedom to deal with records of different schema.
>
> Best, Fabian
>
>
>
> 2018-05-08 12:35 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> At this moment a batch query is ok.
>
>
>
> Do you know any good (Scala) examples how to query batches (different type
> of csv-files) in parallel ?
>
>
>
> Or do you have example of a custom source function, that read csv-files
> parallel ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Monday, May 7, 2018 3:48 PM
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi Esa,
>
> you can certainly read CSV files in parallel. This works very well in a
> batch query.
>
> For streaming queries, that expect data to be ingested in timestamp order
> this is much more challenging, because you need 1) read the files in the
> right order and 2) cannot split files (unless you guarantee that splits are
> read in the right order).
>
> The CsvTableSource does not guarantee to read files in timestamp order (it
> would have to know the timestamps in each file for that).
>
> Having files with different schema is another problem. The SQL / Table API
> require a fixed schema per table (source).
>
>
>
> The only recommendation when reading files in parallel for a streaming use
> case is to implement a custom source function and be careful when
> generating watermarks.
>
> Best, Fabian
>
>
>
> 2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> I would want to read many different type csv-files (time series data)
> parallel using by CsvTableSource. Is that possible in Flink application ?
> If yes, are there exist the examples about that ?
>
>
>
> If it is not, do you have any advices how to do that ?
>
>
>
> Should I combine all csv-files to one csv-file in pre-processing phase ?
> But this has little problem, because there are not same type (columns are
> different, except timestamp-column).
>
>
>
> Best, Esa
>
>
>
>
>
>
>
>
>

RE: Reading csv-files in parallel

Posted by Esa Heikkinen <es...@student.tut.fi>.
Hi

Sorry the stupid question, but how to connect readTextFile (or readCsvFile), MapFunction and SQL together in Scala code ?

Best, Esa

From: Fabian Hueske <fh...@gmail.com>
Sent: Tuesday, May 8, 2018 10:26 PM
To: Esa Heikkinen <es...@student.tut.fi>
Cc: user@flink.apache.org
Subject: Re: Reading csv-files in parallel

Hi,
the Table API / SQL and the DataSet API can be used together in the same program.
So you could read the data with a custom input format or a TextInputFormat and a custom MapFunction parser and hand it to SQL afterwards.
The program would be a regular Scala DataSet program with an ExecutionEnvironment as in the examples or the documenation.
To read many different files, you can put them all in a single folder and scan the whole folder. If you run on the master, you can try the new multi-path feature of FileInputFormats. Alterantively you can add many sources and use a union operator to union all data sets.
Best, Fabian

2018-05-08 15:49 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

Would it better to use DataSet API, Table (Relational) and readCsvFile() , because it is little but upper level implementation ?

SQL also sounds very good in this (batch processing) case, but is it possible to use (because many different type of csv-files) ?
And does it understand timeseries-data ?

By the way, how to the control flow is running in main (Scala) program and what is the structure of main program ?
I did mean, if I want to read many csv-files and I have certain consecutive reading order of them. Is that possible and how ?

Actually I want to implement upper level (state-machine-based) logic for reading csv-files by certain order.

Esa

From: Fabian Hueske <fh...@gmail.com>>
Sent: Tuesday, May 8, 2018 2:00 PM

To: Esa Heikkinen <es...@student.tut.fi>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Reading csv-files in parallel

Hi,
the easiest approach is to read the CSV files linewise as regular text files (ExecutionEnvironment.readTextFile()) and apply custom parse logic in a MapFunction.
Then you have all freedom to deal with records of different schema.
Best, Fabian

2018-05-08 12:35 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

At this moment a batch query is ok.

Do you know any good (Scala) examples how to query batches (different type of csv-files) in parallel ?

Or do you have example of a custom source function, that read csv-files parallel ?

Best, Esa

From: Fabian Hueske <fh...@gmail.com>>
Sent: Monday, May 7, 2018 3:48 PM
To: Esa Heikkinen <es...@student.tut.fi>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Reading csv-files in parallel

Hi Esa,
you can certainly read CSV files in parallel. This works very well in a batch query.
For streaming queries, that expect data to be ingested in timestamp order this is much more challenging, because you need 1) read the files in the right order and 2) cannot split files (unless you guarantee that splits are read in the right order).
The CsvTableSource does not guarantee to read files in timestamp order (it would have to know the timestamps in each file for that).
Having files with different schema is another problem. The SQL / Table API require a fixed schema per table (source).

The only recommendation when reading files in parallel for a streaming use case is to implement a custom source function and be careful when generating watermarks.
Best, Fabian

2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

I would want to read many different type csv-files (time series data) parallel using by CsvTableSource. Is that possible in Flink application ? If yes, are there exist the examples about that ?

If it is not, do you have any advices how to do that ?

Should I combine all csv-files to one csv-file in pre-processing phase ? But this has little problem, because there are not same type (columns are different, except timestamp-column).

Best, Esa





Re: Reading csv-files in parallel

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

the Table API / SQL and the DataSet API can be used together in the same
program.
So you could read the data with a custom input format or a TextInputFormat
and a custom MapFunction parser and hand it to SQL afterwards.

The program would be a regular Scala DataSet program with an
ExecutionEnvironment as in the examples or the documenation.
To read many different files, you can put them all in a single folder and
scan the whole folder. If you run on the master, you can try the new
multi-path feature of FileInputFormats. Alterantively you can add many
sources and use a union operator to union all data sets.

Best, Fabian

2018-05-08 15:49 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:

> Hi
>
>
>
> Would it better to use DataSet API, Table (Relational) and readCsvFile() ,
> because it is little but upper level implementation ?
>
>
>
> SQL also sounds very good in this (batch processing) case, but is it
> possible to use (because many different type of csv-files) ?
>
> And does it understand timeseries-data ?
>
>
>
> By the way, how to the control flow is running in main (Scala) program and
> what is the structure of main program ?
>
> I did mean, if I want to read many csv-files and I have certain
> consecutive reading order of them. Is that possible and how ?
>
>
>
> Actually I want to implement upper level (state-machine-based) logic for
> reading csv-files by certain order.
>
>
>
> Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Tuesday, May 8, 2018 2:00 PM
>
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi,
>
> the easiest approach is to read the CSV files linewise as regular text
> files (ExecutionEnvironment.readTextFile()) and apply custom parse logic
> in a MapFunction.
>
> Then you have all freedom to deal with records of different schema.
>
> Best, Fabian
>
>
>
> 2018-05-08 12:35 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> At this moment a batch query is ok.
>
>
>
> Do you know any good (Scala) examples how to query batches (different type
> of csv-files) in parallel ?
>
>
>
> Or do you have example of a custom source function, that read csv-files
> parallel ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Monday, May 7, 2018 3:48 PM
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi Esa,
>
> you can certainly read CSV files in parallel. This works very well in a
> batch query.
>
> For streaming queries, that expect data to be ingested in timestamp order
> this is much more challenging, because you need 1) read the files in the
> right order and 2) cannot split files (unless you guarantee that splits are
> read in the right order).
>
> The CsvTableSource does not guarantee to read files in timestamp order (it
> would have to know the timestamps in each file for that).
>
> Having files with different schema is another problem. The SQL / Table API
> require a fixed schema per table (source).
>
>
>
> The only recommendation when reading files in parallel for a streaming use
> case is to implement a custom source function and be careful when
> generating watermarks.
>
> Best, Fabian
>
>
>
> 2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> I would want to read many different type csv-files (time series data)
> parallel using by CsvTableSource. Is that possible in Flink application ?
> If yes, are there exist the examples about that ?
>
>
>
> If it is not, do you have any advices how to do that ?
>
>
>
> Should I combine all csv-files to one csv-file in pre-processing phase ?
> But this has little problem, because there are not same type (columns are
> different, except timestamp-column).
>
>
>
> Best, Esa
>
>
>
>
>
>
>

RE: Reading csv-files in parallel

Posted by Esa Heikkinen <es...@student.tut.fi>.
Hi

Would it better to use DataSet API, Table (Relational) and readCsvFile() , because it is little but upper level implementation ?

SQL also sounds very good in this (batch processing) case, but is it possible to use (because many different type of csv-files) ?
And does it understand timeseries-data ?

By the way, how to the control flow is running in main (Scala) program and what is the structure of main program ?
I did mean, if I want to read many csv-files and I have certain consecutive reading order of them. Is that possible and how ?

Actually I want to implement upper level (state-machine-based) logic for reading csv-files by certain order.

Esa

From: Fabian Hueske <fh...@gmail.com>
Sent: Tuesday, May 8, 2018 2:00 PM
To: Esa Heikkinen <es...@student.tut.fi>
Cc: user@flink.apache.org
Subject: Re: Reading csv-files in parallel

Hi,
the easiest approach is to read the CSV files linewise as regular text files (ExecutionEnvironment.readTextFile()) and apply custom parse logic in a MapFunction.
Then you have all freedom to deal with records of different schema.
Best, Fabian

2018-05-08 12:35 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

At this moment a batch query is ok.

Do you know any good (Scala) examples how to query batches (different type of csv-files) in parallel ?

Or do you have example of a custom source function, that read csv-files parallel ?

Best, Esa

From: Fabian Hueske <fh...@gmail.com>>
Sent: Monday, May 7, 2018 3:48 PM
To: Esa Heikkinen <es...@student.tut.fi>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Reading csv-files in parallel

Hi Esa,
you can certainly read CSV files in parallel. This works very well in a batch query.
For streaming queries, that expect data to be ingested in timestamp order this is much more challenging, because you need 1) read the files in the right order and 2) cannot split files (unless you guarantee that splits are read in the right order).
The CsvTableSource does not guarantee to read files in timestamp order (it would have to know the timestamps in each file for that).
Having files with different schema is another problem. The SQL / Table API require a fixed schema per table (source).

The only recommendation when reading files in parallel for a streaming use case is to implement a custom source function and be careful when generating watermarks.
Best, Fabian

2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

I would want to read many different type csv-files (time series data) parallel using by CsvTableSource. Is that possible in Flink application ? If yes, are there exist the examples about that ?

If it is not, do you have any advices how to do that ?

Should I combine all csv-files to one csv-file in pre-processing phase ? But this has little problem, because there are not same type (columns are different, except timestamp-column).

Best, Esa




Re: Reading csv-files in parallel

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

the easiest approach is to read the CSV files linewise as regular text
files (ExecutionEnvironment.readTextFile()) and apply custom parse logic in
a MapFunction.
Then you have all freedom to deal with records of different schema.

Best, Fabian

2018-05-08 12:35 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:

> Hi
>
>
>
> At this moment a batch query is ok.
>
>
>
> Do you know any good (Scala) examples how to query batches (different type
> of csv-files) in parallel ?
>
>
>
> Or do you have example of a custom source function, that read csv-files
> parallel ?
>
>
>
> Best, Esa
>
>
>
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Monday, May 7, 2018 3:48 PM
> *To:* Esa Heikkinen <es...@student.tut.fi>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reading csv-files in parallel
>
>
>
> Hi Esa,
>
> you can certainly read CSV files in parallel. This works very well in a
> batch query.
>
> For streaming queries, that expect data to be ingested in timestamp order
> this is much more challenging, because you need 1) read the files in the
> right order and 2) cannot split files (unless you guarantee that splits are
> read in the right order).
>
> The CsvTableSource does not guarantee to read files in timestamp order (it
> would have to know the timestamps in each file for that).
>
> Having files with different schema is another problem. The SQL / Table API
> require a fixed schema per table (source).
>
>
>
> The only recommendation when reading files in parallel for a streaming use
> case is to implement a custom source function and be careful when
> generating watermarks.
>
> Best, Fabian
>
>
>
> 2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:
>
> Hi
>
>
>
> I would want to read many different type csv-files (time series data)
> parallel using by CsvTableSource. Is that possible in Flink application ?
> If yes, are there exist the examples about that ?
>
>
>
> If it is not, do you have any advices how to do that ?
>
>
>
> Should I combine all csv-files to one csv-file in pre-processing phase ?
> But this has little problem, because there are not same type (columns are
> different, except timestamp-column).
>
>
>
> Best, Esa
>
>
>
>
>

RE: Reading csv-files in parallel

Posted by Esa Heikkinen <es...@student.tut.fi>.
Hi

At this moment a batch query is ok.

Do you know any good (Scala) examples how to query batches (different type of csv-files) in parallel ?

Or do you have example of a custom source function, that read csv-files parallel ?

Best, Esa

From: Fabian Hueske <fh...@gmail.com>
Sent: Monday, May 7, 2018 3:48 PM
To: Esa Heikkinen <es...@student.tut.fi>
Cc: user@flink.apache.org
Subject: Re: Reading csv-files in parallel

Hi Esa,
you can certainly read CSV files in parallel. This works very well in a batch query.
For streaming queries, that expect data to be ingested in timestamp order this is much more challenging, because you need 1) read the files in the right order and 2) cannot split files (unless you guarantee that splits are read in the right order).
The CsvTableSource does not guarantee to read files in timestamp order (it would have to know the timestamps in each file for that).
Having files with different schema is another problem. The SQL / Table API require a fixed schema per table (source).

The only recommendation when reading files in parallel for a streaming use case is to implement a custom source function and be careful when generating watermarks.
Best, Fabian

2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>>:
Hi

I would want to read many different type csv-files (time series data) parallel using by CsvTableSource. Is that possible in Flink application ? If yes, are there exist the examples about that ?

If it is not, do you have any advices how to do that ?

Should I combine all csv-files to one csv-file in pre-processing phase ? But this has little problem, because there are not same type (columns are different, except timestamp-column).

Best, Esa



Re: Reading csv-files in parallel

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Esa,

you can certainly read CSV files in parallel. This works very well in a
batch query.
For streaming queries, that expect data to be ingested in timestamp order
this is much more challenging, because you need 1) read the files in the
right order and 2) cannot split files (unless you guarantee that splits are
read in the right order).
The CsvTableSource does not guarantee to read files in timestamp order (it
would have to know the timestamps in each file for that).

Having files with different schema is another problem. The SQL / Table API
require a fixed schema per table (source).

The only recommendation when reading files in parallel for a streaming use
case is to implement a custom source function and be careful when
generating watermarks.

Best, Fabian

2018-05-07 12:44 GMT+02:00 Esa Heikkinen <es...@student.tut.fi>:

> Hi
>
>
>
> I would want to read many different type csv-files (time series data)
> parallel using by CsvTableSource. Is that possible in Flink application ?
> If yes, are there exist the examples about that ?
>
>
>
> If it is not, do you have any advices how to do that ?
>
>
>
> Should I combine all csv-files to one csv-file in pre-processing phase ?
> But this has little problem, because there are not same type (columns are
> different, except timestamp-column).
>
>
>
> Best, Esa
>
>
>