You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aureliano Buendia <bu...@gmail.com> on 2014/03/22 00:04:36 UTC

How to save as a single file efficiently?

Hi,

Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We
found that a partition number of 1000 is a good number to speed the process
up. However, it does not make sense to have 1000 pieces of csv files each
less than 1 kb.

We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow,
and we are not properly using our resources this way. So this is very slow:

rdd.map(...).coalesce(1).saveAsTextFile()

How is it possible to use coalesce(1) simply for concatenating the
materialized output text files? Would something like this make sense?:

rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()

Or, would something like this achieve it?:

rdd.map(...).cache().coalesce(1).saveAsTextFile()

Re: How to save as a single file efficiently?

Posted by "deenar.toraskar" <de...@db.com>.
Aureliano 

Apologies for hijacking this thread.

Matei

On the subject of processing lots (millions) of small input files on HDFS,
what are the best practices to follow on spark. Currently my code looks
something like this. Without coalesce there is one task and one output file
per input file. But putting coalesce in reduces the output files. I have
used mapValues as the map step preserves partitioning.Do I need coalesce
before the first map as well?

val dataRDD = sc.newAPIHadoopRDD(conf,
classOf[com.db.pnlStore.pnlInfra.WholeFileInputFormat],
classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) 
val data  = dataRDD.map(row => (row._1.toString,
Try(rawSdosParser(row._2.toString(), null)))).coalesce(100)
val datatoLoad=  data.filter(_._2.isSuccess).mapValues(value => value match
{ case Success ( s) => Try(s.iterator.toList)})
val datatoSave=  datatoLoad.filter(_._2.isSuccess).mapValues(value => value
match { case Success(s) => s} )
datatoSave.saveAsObjectFile("Outputall_new/outputall_RMS_ObjFiles")

Deenar



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-as-a-single-file-efficiently-tp3014p3021.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to save as a single file efficiently?

Posted by Matei Zaharia <ma...@gmail.com>.
Ah, the reason is because coalesce is often used to deal with lots of small input files on HDFS. In that case you don’t want to reshuffle them all across the network, you just want each mapper to directly read multiple files (and you want fewer than one mapper per file).

Matei

On Mar 21, 2014, at 5:01 PM, Aureliano Buendia <bu...@gmail.com> wrote:

> Good to know it's as simple as that! I wonder why shuffle=true is not the default for coalesce().
> 
> 
> On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Try passing the shuffle=true parameter to coalesce, then it will do the map in parallel but still pass all the data through one reduce node for writing it out. That’s probably the fastest it will get. No need to cache if you do that.
> 
> Matei
> 
> On Mar 21, 2014, at 4:04 PM, Aureliano Buendia <bu...@gmail.com> wrote:
> 
> > Hi,
> >
> > Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We found that a partition number of 1000 is a good number to speed the process up. However, it does not make sense to have 1000 pieces of csv files each less than 1 kb.
> >
> > We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and we are not properly using our resources this way. So this is very slow:
> >
> > rdd.map(...).coalesce(1).saveAsTextFile()
> >
> > How is it possible to use coalesce(1) simply for concatenating the materialized output text files? Would something like this make sense?:
> >
> > rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> >
> > Or, would something like this achieve it?:
> >
> > rdd.map(...).cache().coalesce(1).saveAsTextFile()
> 
> 


Re: How to save as a single file efficiently?

Posted by Aureliano Buendia <bu...@gmail.com>.
Good to know it's as simple as that! I wonder why shuffle=true is not the
default for coalesce().


On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Try passing the shuffle=true parameter to coalesce, then it will do the
> map in parallel but still pass all the data through one reduce node for
> writing it out. That's probably the fastest it will get. No need to cache
> if you do that.
>
> Matei
>
> On Mar 21, 2014, at 4:04 PM, Aureliano Buendia <bu...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We
> found that a partition number of 1000 is a good number to speed the process
> up. However, it does not make sense to have 1000 pieces of csv files each
> less than 1 kb.
> >
> > We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow,
> and we are not properly using our resources this way. So this is very slow:
> >
> > rdd.map(...).coalesce(1).saveAsTextFile()
> >
> > How is it possible to use coalesce(1) simply for concatenating the
> materialized output text files? Would something like this make sense?:
> >
> > rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> >
> > Or, would something like this achieve it?:
> >
> > rdd.map(...).cache().coalesce(1).saveAsTextFile()
>
>

Re: How to save as a single file efficiently?

Posted by Matei Zaharia <ma...@gmail.com>.
Try passing the shuffle=true parameter to coalesce, then it will do the map in parallel but still pass all the data through one reduce node for writing it out. That’s probably the fastest it will get. No need to cache if you do that.

Matei

On Mar 21, 2014, at 4:04 PM, Aureliano Buendia <bu...@gmail.com> wrote:

> Hi,
> 
> Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We found that a partition number of 1000 is a good number to speed the process up. However, it does not make sense to have 1000 pieces of csv files each less than 1 kb.
> 
> We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and we are not properly using our resources this way. So this is very slow:
> 
> rdd.map(...).coalesce(1).saveAsTextFile()
> 
> How is it possible to use coalesce(1) simply for concatenating the materialized output text files? Would something like this make sense?:
> 
> rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> 
> Or, would something like this achieve it?:
> 
> rdd.map(...).cache().coalesce(1).saveAsTextFile()