You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Guillaume Pitel <gu...@exensa.com> on 2016/04/27 13:46:53 UTC

Decrease shuffle in TreeAggregate with coalesce ?

Hi,

I've been looking at the code of RDD.treeAggregate, because we've seen a 
huge performance drop between 1.5.2 and 1.6.1 on a treeReduce. I think 
the treeAggregate code hasn't changed, so my message is not about the 
performance drop, but a more general remark about treeAggregate.

In treeAggregate, after the aggregate is applied inside original 
partitions, we enter the tree :


	while (numPartitions > scale + math.ceil(numPartitions.toDouble / 
scale)) {

	numPartitions /= scale

	val curNumPartitions = numPartitions

	*partiallyAggregated **=**partiallyAggregated.mapPartitionsWithIndex {*

	*(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*

	}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values

	}


The two lines where the partitions are numbered then renumbered, then 
reducedByKey seems below optimality to me. There is a huge shuffle cost, 
while a simple coalesce followed by a partition-level aggregation would 
probably perfectly do the job.

Have I missed something that requires to do this reshuffle ?

Best regards
Guillaume Pitel

Re: Decrease shuffle in TreeAggregate with coalesce ?

Posted by Guillaume Pitel <gu...@exensa.com>.
Long story short, regarding the performance issue, it appeared with 
recompiled version of the source TGZ downloaded from spark website.

Problem disappears with 1.6.2-SNAPSHOT (branch-1.6)

Guillaume
>> Do you have code which can reproduce this performance drop in 
>> treeReduce?  It would be helpful to debug.  In the 1.6 release, we 
>> profiled it via the various MLlib algorithms and did not see 
>> performance drops.
> That would be difficult, but if we cannot find out, we'll design a 
> small example to test that. I first have to check with latest git 
> version. I have to recompile spark with lgpl version of netlib.


Re: Decrease shuffle in TreeAggregate with coalesce ?

Posted by Guillaume Pitel <gu...@exensa.com>.
Le 27/04/2016 à 19:41, Joseph Bradley a écrit :
> Do you have code which can reproduce this performance drop in 
> treeReduce?  It would be helpful to debug.  In the 1.6 release, we 
> profiled it via the various MLlib algorithms and did not see 
> performance drops.
That would be difficult, but if we cannot find out, we'll design a small 
example to test that. I first have to check with latest git version. I 
have to recompile spark with lgpl version of netlib.

> It's not just renumbering the partitions; it is reducing the number of 
> partitions by a factor of 1.0/scale (where scale > 1).  This creates a 
> "tree"-structured aggregation so that more of the work of merging 
> during aggregation is done on the workers, not the driver.
>
Sure,I get that, and it wasn't my point. I just think coalesce also 
reduces the number of partitions, without shuffle, right ?

_With Coalesce :_
Let's say we have 2 workers with 2 partitions each.

W0: p0,p1
W1: p1,p2

Since coalesce tries to reduce shuffling, coalesce(2) should group 
contents of p0 and p1 in p0' (on W0) and p2 and p3 in p1' (-on W1)

OTOH, _with current mapPartitionWithIndex + modulo + reduceByKey_, let's 
say partitions are numbered like that :

(0,p0),(1,p1),(2,p2),(3,p3)

Then after the modulo, (0,p0),(1,p1),(0,p2),(1,p3)

As a consequence, W1 will shuffle p2 to W0 and W0 will shuffle p1 to W1.

Guillaume

> On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel 
> <guillaume.pitel@exensa.com <ma...@exensa.com>> wrote:
>
>     Hi,
>
>     I've been looking at the code of RDD.treeAggregate, because we've
>     seen a huge performance drop between 1.5.2 and 1.6.1 on a
>     treeReduce. I think the treeAggregate code hasn't changed, so my
>     message is not about the performance drop, but a more general
>     remark about treeAggregate.
>
>     In treeAggregate, after the aggregate is applied inside original
>     partitions, we enter the tree :
>
>
>     	while (numPartitions > scale + math.ceil(numPartitions.toDouble /
>     scale)) {
>
>     	numPartitions /= scale
>
>     	val curNumPartitions = numPartitions
>
>     	*partiallyAggregated
>     **=**partiallyAggregated.mapPartitionsWithIndex {*
>
>     	*(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*
>
>     	}.reduceByKey(new HashPartitioner(curNumPartitions),
>     cleanCombOp).values
>
>     	}
>
>
>     The two lines where the partitions are numbered then renumbered,
>     then reducedByKey seems below optimality to me. There is a huge
>     shuffle cost, while a simple coalesce followed by a
>     partition-level aggregation would probably perfectly do the job.
>
>     Have I missed something that requires to do this reshuffle ?
>
>     Best regards
>     Guillaume Pitel
>
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Re: Decrease shuffle in TreeAggregate with coalesce ?

Posted by Joseph Bradley <jo...@databricks.com>.
Do you have code which can reproduce this performance drop in treeReduce?
It would be helpful to debug.  In the 1.6 release, we profiled it via the
various MLlib algorithms and did not see performance drops.

It's not just renumbering the partitions; it is reducing the number of
partitions by a factor of 1.0/scale (where scale > 1).  This creates a
"tree"-structured aggregation so that more of the work of merging during
aggregation is done on the workers, not the driver.

On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel <guillaume.pitel@exensa.com
> wrote:

> Hi,
>
> I've been looking at the code of RDD.treeAggregate, because we've seen a
> huge performance drop between 1.5.2 and 1.6.1 on a treeReduce. I think the
> treeAggregate code hasn't changed, so my message is not about the
> performance drop, but a more general remark about treeAggregate.
>
> In treeAggregate, after the aggregate is applied inside original
> partitions, we enter the tree :
>
>
> while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale))
> {
>
> numPartitions /= scale
>
> val curNumPartitions = numPartitions
>
> * partiallyAggregated **=** partiallyAggregated.mapPartitionsWithIndex {*
>
> * (i, iter) **=>** iter.map((i **%** curNumPartitions, _))*
>
> }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
>
> }
>
>
> The two lines where the partitions are numbered then renumbered, then
> reducedByKey seems below optimality to me. There is a huge shuffle cost,
> while a simple coalesce followed by a partition-level aggregation would
> probably perfectly do the job.
>
> Have I missed something that requires to do this reshuffle ?
>
> Best regards
> Guillaume Pitel
>