You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Luis Guerra <lu...@gmail.com> on 2014/09/17 14:21:01 UTC

Number of partitions when saving (pyspark)

Hi everyone,

Is it possible to fix the number of tasks related to a saveAsTextFile in
Pyspark?

I am loading several files from HDFS, fixing the number of partitions to X
(let's say 40 for instance). Then some transformations, like joins and
filters are carried out. The weird thing here is that the number of tasks
involved in these transformations are 80, i.e. the double of the fixed
number of partitions. However, when the saveAsTextFile action is carried
out, there are only 4 tasks to do this (and I have not been able to
increase that number). My problem here is that those 4 tasks make rapidly
increase the used memory and take too long to finish.

I am launching my process from windows to a cluster in ubuntu, with 13
computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.

Any clue with this?

Thanks in advance

Re: Number of partitions when saving (pyspark)

Posted by Luis Guerra <lu...@gmail.com>.
Thanks for the answer,

I have tried this solution but it did not work...

This is the involved part of the code:

data_aux= cruce.groupBy(lambda x: (x[0], x[1]), 80)
#data_aux= cruce.keyBy(lambda x: (x[0], x[1])).groupByKey(80)

exit_1= data_aux.filter(lambda (a,b): len(b) > 1).values()
exit_2 = data_aux.filter(lambda (a,b): len(b) == 1).values()

exit_1.map(lambda x: x.data).saveAsTextFile(exit1_path)
exit_2.map(lambda x: x.data).saveAsTextFile(exit2_path)

As you can see, I am using 80 partitions within the groupBy (and I have
also tried keyBy-GroupByKey). However, the saving process is carried out
only in 4 stages.

What am I doing wrong?

On Wed, Sep 17, 2014 at 6:20 PM, Davies Liu <da...@databricks.com> wrote:

> On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra <lu...@gmail.com>
> wrote:
> > Hi everyone,
> >
> > Is it possible to fix the number of tasks related to a saveAsTextFile in
> > Pyspark?
> >
> > I am loading several files from HDFS, fixing the number of partitions to
> X
> > (let's say 40 for instance). Then some transformations, like joins and
> > filters are carried out. The weird thing here is that the number of tasks
> > involved in these transformations are 80, i.e. the double of the fixed
> > number of partitions. However, when the saveAsTextFile action is carried
> > out, there are only 4 tasks to do this (and I have not been able to
> increase
> > that number). My problem here is that those 4 tasks make rapidly increase
> > the used memory and take too long to finish.
>
> > I am launching my process from windows to a cluster in ubuntu, with 13
> > computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.
>
> The saveAsTextFile() is an mapper RDD, so the number of partitions of it
> is determined by previous RDD.
>
> In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs
> on driver (locally) as the default partitions, so it's 4. You need to
> change it
> to 40 or 80 in this case.
>
> BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of
> partitions of previous RDD as the default value.
>
> Davies
>
> > Any clue with this?
> >
> > Thanks in advance
>

Re: Number of partitions when saving (pyspark)

Posted by Davies Liu <da...@databricks.com>.
On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra <lu...@gmail.com> wrote:
> Hi everyone,
>
> Is it possible to fix the number of tasks related to a saveAsTextFile in
> Pyspark?
>
> I am loading several files from HDFS, fixing the number of partitions to X
> (let's say 40 for instance). Then some transformations, like joins and
> filters are carried out. The weird thing here is that the number of tasks
> involved in these transformations are 80, i.e. the double of the fixed
> number of partitions. However, when the saveAsTextFile action is carried
> out, there are only 4 tasks to do this (and I have not been able to increase
> that number). My problem here is that those 4 tasks make rapidly increase
> the used memory and take too long to finish.

> I am launching my process from windows to a cluster in ubuntu, with 13
> computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.

The saveAsTextFile() is an mapper RDD, so the number of partitions of it
is determined by previous RDD.

In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs
on driver (locally) as the default partitions, so it's 4. You need to change it
to 40 or 80 in this case.

BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of
partitions of previous RDD as the default value.

Davies

> Any clue with this?
>
> Thanks in advance

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org