You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "holdenk (JIRA)" <ji...@apache.org> on 2018/08/06 17:20:00 UTC
[jira] [Comment Edited] (SPARK-21436) Take advantage of known
partioner for distinct on RDDs
[ https://issues.apache.org/jira/browse/SPARK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570517#comment-16570517 ]
holdenk edited comment on SPARK-21436 at 8/6/18 5:19 PM:
---------------------------------------------------------
@[~podongfeng]
So distinct triggers a `map` first (e.g. it is
{code:java}
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1){code}
) and the `map` will throw away our partition information.
I did a quick test in the shell and you can see it causes a second shuffle if you run the same commands:
{code:java}
scala> sc.parallelize(1.to(100))
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> sc.parallelize(1.to(100)).count()
res1: Long = 100
scala> sc.parallelize(1.to(100))
res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> res2.sort()
<console>:26: error: value sort is not a member of org.apache.spark.rdd.RDD[Int]
res2.sort()
^
scala> res2.map(x => (x, x))
res4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:26
scala> res4.sortByKey()
res5: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at sortByKey at <console>:26
scala> res5.cache()
res6: res5.type = ShuffledRDD[4] at sortByKey at <console>:26
scala> res6.count()
res7: Long = 100
scala> res6.distinct()
res8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at distinct at <console>:26
scala> res8.count()
res9: Long = 100
{code}
was (Author: holdenk):
@[~podongfeng]
So distinct triggers a `map` first (e.g. it is `map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)`) and the `map` will throw away our partition information.
I did a quick test in the shell and you can see it causes a second shuffle if you run the same commands:
{code:java}
scala> sc.parallelize(1.to(100))
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> sc.parallelize(1.to(100)).count()
res1: Long = 100
scala> sc.parallelize(1.to(100))
res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> res2.sort()
<console>:26: error: value sort is not a member of org.apache.spark.rdd.RDD[Int]
res2.sort()
^
scala> res2.map(x => (x, x))
res4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:26
scala> res4.sortByKey()
res5: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at sortByKey at <console>:26
scala> res5.cache()
res6: res5.type = ShuffledRDD[4] at sortByKey at <console>:26
scala> res6.count()
res7: Long = 100
scala> res6.distinct()
res8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at distinct at <console>:26
scala> res8.count()
res9: Long = 100
{code}
> Take advantage of known partioner for distinct on RDDs
> ------------------------------------------------------
>
> Key: SPARK-21436
> URL: https://issues.apache.org/jira/browse/SPARK-21436
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.3.0
> Reporter: holdenk
> Priority: Minor
>
> If we have a known partitioner we should be able to avoid the shuffle.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org