You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by benoitdr <be...@nokia.com> on 2016/11/15 13:44:40 UTC

CSV to parquet preserving partitioning

Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: CSV to parquet preserving partitioning

Posted by benoitdr <be...@nokia.com>.
Best solution I've found so far (no shuffling and as many threads as input
dirs) :

Create an rdd of input dirs, with as many partitions as input dirs
Transform it to an rdd of input files (preserving the partitions by dirs)
Flat-map it with a custom csv parser
Convert rdd to dataframe
Write dataframe to parquet table partitioned by dirs

It requires to write his own parser. I could not find a solution to preserve
the partitioning using sc.textfile or the databricks csv parser.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: CSV to parquet preserving partitioning

Posted by benoitdr <be...@nokia.com>.
This is more or less how I'm doing it now.
Problem is that it creates shuffling in the cluster because the input data
are not collocated according to the partition scheme.

If a reload the output parquet files as a new dataframe, then everything is
fine, but I'd like to avoid shuffling also during the ETL phase.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: CSV to parquet preserving partitioning

Posted by neil90 <ne...@icloud.com>.
All you need to do is load all the files into one dataframe at once. Then
save the dataframe using partitionBy -

df.write.format("parquet").partitionBy("directoryCol").save("hdfs://path")

Then if you look at the new folder it should look like how you want it I.E -
hdfs://path/dir=dir1/part-r-xxx.gz.parquet 
hdfs://path/dir=dir2/part-r-yyy.gz.parquet 
hdfs://path/dir=dir3/part-r-zzz.gz.parquet 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: CSV to parquet preserving partitioning

Posted by benoitdr <be...@nokia.com>.
Yes, by parsing the file content, it's possible to recover in which directory they are.

From: neil90 [via Apache Spark User List] [mailto:ml-node+s1001560n28083h21@n3.nabble.com]
Sent: mercredi 16 novembre 2016 17:41
To: Drooghaag, Benoit (Nokia - BE) <be...@nokia.com>
Subject: Re: CSV to parquet preserving partitioning

Is there anything in the files to let you know which directory they should be in?
________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
To unsubscribe from CSV to parquet preserving partitioning, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=28078&code=YmVub2l0LmRyb29naGFhZ0Bub2tpYS5jb218MjgwNzh8LTE1NDA4OTg4OTg=>.
NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: CSV to parquet preserving partitioning

Posted by neil90 <ne...@icloud.com>.
Is there anything in the files to let you know which directory they should be
in?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: CSV to parquet preserving partitioning

Posted by "Drooghaag, Benoit (Nokia - BE)" <be...@nokia.com>.
Good point, thanks !

That does the job from the moment the datasets corresponding to each input directory contain a single partition.

Question now is how to achieve this without shuffling the data ?
I’m using the databricks csv reader on spark 1.6 and I don’t think there is a way to control the partitioning.
As I can see, it creates one partition per csv file, so the data from one input directory can be puzzled accross the nodes ...

From: Daniel Siegmann [mailto:dsiegmann@securityscorecard.io]
Sent: mardi 15 novembre 2016 18:57
To: Drooghaag, Benoit (Nokia - BE) <be...@nokia.com>
Cc: user <us...@spark.apache.org>
Subject: Re: CSV to parquet preserving partitioning

Did you try unioning the datasets for each CSV into a single dataset? You may need to put the directory name into a column so you can partition by it.
On Tue, Nov 15, 2016 at 8:44 AM, benoitdr <be...@nokia.com>> wrote:
Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>


Re: CSV to parquet preserving partitioning

Posted by Daniel Siegmann <ds...@securityscorecard.io>.
Did you try unioning the datasets for each CSV into a single dataset? You
may need to put the directory name into a column so you can partition by it.

On Tue, Nov 15, 2016 at 8:44 AM, benoitdr <be...@nokia.com>
wrote:

> Hello,
>
> I'm trying to convert a bunch of csv files to parquet, with the interesting
> case that the input csv files are already "partitioned" by directory.
> All the input files have the same set of columns.
> The input files structure looks like :
>
> /path/dir1/file1.csv
> /path/dir1/file2.csv
> /path/dir2/file3.csv
> /path/dir3/file4.csv
> /path/dir3/file5.csv
> /path/dir3/file6.csv
>
> I'd like to read those files and write their data to a parquet table in
> hdfs, preserving the partitioning (partitioned by input directory), and
> such
> as there is a single output file per partition.
> The output files strucutre should look like :
>
> hdfs://path/dir=dir1/part-r-xxx.gz.parquet
> hdfs://path/dir=dir2/part-r-yyy.gz.parquet
> hdfs://path/dir=dir3/part-r-zzz.gz.parquet
>
>
> The best solution I have found so far is to loop among the input
> directories, loading the csv files in a dataframe and to write the
> dataframe
> in the target partition.
> But this not efficient since I want a single output file per partition, the
> writing to hdfs is a single tasks that blocks the loop.
> I wonder how to achieve this with a maximum of parallelism (and without
> shuffling the data in the cluster).
>
> Thanks !
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>