You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/04 08:42:43 UTC

[GitHub] [spark] zhengruifeng opened a new pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

zhengruifeng opened a new pull request #29185:
URL: https://github.com/apache/spark/pull/29185


   ### What changes were proposed in this pull request?
   avoid unnecessary shuffle if possible
   
   ### Why are the changes needed?
   In `combineByKeyWithClassTag`, there is a check to avoid unnecessary shuffle if possible:
   
   ```scala
   if (self.partitioner == Some(partitioner)) {
     self.mapPartitions(iter => {
       val context = TaskContext.get()
       new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
     }, preservesPartitioning = true)
   } else {
     new ShuffledRDD[K, V, C](self, partitioner)
       .setSerializer(serializer)
       .setAggregator(aggregator)
       .setMapSideCombine(mapSideCombine)
   }
   ```
   
   `repartitionAndSortWithinPartitions` should also avoid unnecessary shuffle.
   
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   added testsuites and existing testsuites
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #29185:
URL: https://github.com/apache/spark/pull/29185


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662289378


   **[Test build #126314 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126314/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773152387


   **[Test build #134866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134866/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773222385


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39454/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #29185:
URL: https://github.com/apache/spark/pull/29185#discussion_r566639506



##########
File path: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
##########
@@ -73,7 +75,21 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * because it can push the sorting down into the shuffle machinery.
    */
   def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
-    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
+    if (self.partitioner == Some(partitioner)) {
+      self.mapPartitions(iter => {
+        val context = TaskContext.get
+        val sorter = new ExternalSorter[K, V, V](context, None, None, Some(ordering))
+        sorter.insertAll(iter)
+        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
+        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+        context.taskMetrics.incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+        val outputIter = new InterruptibleIterator(context,
+          sorter.iterator.asInstanceOf[Iterator[(K, V)]])
+        CompletionIterator[(K, V), Iterator[(K, V)]](outputIter, sorter.stop)
+      }, preservesPartitioning = true)

Review comment:
       current pr is also similar to `def distinct(numPartitions: Int)` in `RDD.scala`, I think the main difference is that : in `distinct(numPartitions: Int)` a ExternalAppendOnlyMap is used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773135060


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #29185:
URL: https://github.com/apache/spark/pull/29185


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662362036






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662289378


   **[Test build #126314 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126314/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773255968


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134866/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662361283


   **[Test build #126314 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126314/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773152387


   **[Test build #134866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134866/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773222385






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662290016






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773186615


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39454/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773222385


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39454/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #29185:
URL: https://github.com/apache/spark/pull/29185#discussion_r468215507



##########
File path: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
##########
@@ -73,7 +75,21 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * because it can push the sorting down into the shuffle machinery.
    */
   def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
-    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
+    if (self.partitioner == Some(partitioner)) {
+      self.mapPartitions(iter => {
+        val context = TaskContext.get
+        val sorter = new ExternalSorter[K, V, V](context, None, None, Some(ordering))
+        sorter.insertAll(iter)
+        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
+        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+        context.taskMetrics.incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+        val outputIter = new InterruptibleIterator(context,
+          sorter.iterator.asInstanceOf[Iterator[(K, V)]])
+        CompletionIterator[(K, V), Iterator[(K, V)]](outputIter, sorter.stop)
+      }, preservesPartitioning = true)

Review comment:
       Can we refactor the code here and `BlockStoreShuffleReader.read` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773255968


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134866/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662290016






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773152387






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773135060


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-730047315


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773222385






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773195188


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39454/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662289751


   test code:
   ```
   import org.apache.spark.HashPartitioner
   
   val data = sc.parallelize(Seq((0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)), 2)
   val partitioner = new HashPartitioner(2)
   val agg = data.reduceByKey(partitioner, _ + _)
   agg.persist()
   agg.count
   val sorted = agg.repartitionAndSortWithinPartitions(partitioner)
   sorted.count
   
   ```
   
   Master:
   ![repartitionAndSortWithinPartitions_old](https://user-images.githubusercontent.com/7322292/88146995-31d82700-cc2f-11ea-9eb8-44b75c8153c8.png)
   
   
   this PR:
   ![repartitionAndSortWithinPartitions_new](https://user-images.githubusercontent.com/7322292/88147016-3b618f00-cc2f-11ea-89a1-8c652c7cdbfe.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-662362036






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #29185:
URL: https://github.com/apache/spark/pull/29185#discussion_r566626246



##########
File path: core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
##########
@@ -73,7 +75,21 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * because it can push the sorting down into the shuffle machinery.
    */
   def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
-    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
+    if (self.partitioner == Some(partitioner)) {
+      self.mapPartitions(iter => {
+        val context = TaskContext.get
+        val sorter = new ExternalSorter[K, V, V](context, None, None, Some(ordering))
+        sorter.insertAll(iter)
+        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
+        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+        context.taskMetrics.incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+        val outputIter = new InterruptibleIterator(context,
+          sorter.iterator.asInstanceOf[Iterator[(K, V)]])
+        CompletionIterator[(K, V), Iterator[(K, V)]](outputIter, sorter.stop)
+      }, preservesPartitioning = true)

Review comment:
       @mridulm sorry for the late reply.
   I think I am missing something, I don't quite understand why/how to refactor here, do you mean adding a Listener like this?
   ```
           // Use completion callback to stop sorter if task was finished/cancelled.
           context.addTaskCompletionListener[Unit](_ => {
             sorter.stop()
           })
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773152387


   **[Test build #134866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134866/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29185:
URL: https://github.com/apache/spark/pull/29185#issuecomment-773242012


   **[Test build #134866 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134866/testReport)** for PR 29185 at commit [`cb1148c`](https://github.com/apache/spark/commit/cb1148c6c97ca52f35288dc739138930ac07f295).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #29185: [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #29185:
URL: https://github.com/apache/spark/pull/29185


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org