You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "holdenk (JIRA)" <ji...@apache.org> on 2018/08/06 17:20:00 UTC

[jira] [Comment Edited] (SPARK-21436) Take advantage of known partioner for distinct on RDDs

    [ https://issues.apache.org/jira/browse/SPARK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570517#comment-16570517 ] 

holdenk edited comment on SPARK-21436 at 8/6/18 5:19 PM:
---------------------------------------------------------

@[~podongfeng] 

So distinct triggers a `map` first (e.g. it is
{code:java}
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1){code}
) and the `map` will throw away our partition information.

 

I did a quick test in the shell and you can see it causes a second shuffle if you run the same commands:

 
{code:java}
scala> sc.parallelize(1.to(100))
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> sc.parallelize(1.to(100)).count()
res1: Long = 100
scala> sc.parallelize(1.to(100))
res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> res2.sort()
<console>:26: error: value sort is not a member of org.apache.spark.rdd.RDD[Int]
 res2.sort()
 ^
scala> res2.map(x => (x, x))
res4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:26
scala> res4.sortByKey()
res5: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at sortByKey at <console>:26
scala> res5.cache()
res6: res5.type = ShuffledRDD[4] at sortByKey at <console>:26
scala> res6.count()
res7: Long = 100
scala> res6.distinct()
res8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at distinct at <console>:26
scala> res8.count()
res9: Long = 100
{code}
 


was (Author: holdenk):
@[~podongfeng] 

So distinct triggers a `map` first (e.g. it is `map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)`) and the `map` will throw away our partition information.

 

I did a quick test in the shell and you can see it causes a second shuffle if you run the same commands:

 
{code:java}

scala> sc.parallelize(1.to(100))
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> sc.parallelize(1.to(100)).count()
res1: Long = 100
scala> sc.parallelize(1.to(100))
res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> res2.sort()
<console>:26: error: value sort is not a member of org.apache.spark.rdd.RDD[Int]
 res2.sort()
 ^
scala> res2.map(x => (x, x))
res4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:26
scala> res4.sortByKey()
res5: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at sortByKey at <console>:26
scala> res5.cache()
res6: res5.type = ShuffledRDD[4] at sortByKey at <console>:26
scala> res6.count()
res7: Long = 100
scala> res6.distinct()
res8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at distinct at <console>:26
scala> res8.count()
res9: Long = 100
{code}
 

> Take advantage of known partioner for distinct on RDDs
> ------------------------------------------------------
>
>                 Key: SPARK-21436
>                 URL: https://issues.apache.org/jira/browse/SPARK-21436
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: holdenk
>            Priority: Minor
>
> If we have a known partitioner we should be able to avoid the shuffle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org