You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/09/12 11:10:52 UTC

spark git commit: [SPARK-17447] Performance improvement in Partitioner.defaultPartitioner without sortBy

Repository: spark
Updated Branches:
  refs/heads/master cc87280fc -> 4efcdb7fe


[SPARK-17447] Performance improvement in Partitioner.defaultPartitioner without sortBy

## What changes were proposed in this pull request?

if there are many rdds in some situations,the sort will loss he performance servely,actually we needn't sort the rdds , we can just scan the rdds one time to gain the same goal.

## How was this patch tested?

manual tests

Author: codlife <10...@qq.com>

Closes #15039 from codlife/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4efcdb7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4efcdb7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4efcdb7f

Branch: refs/heads/master
Commit: 4efcdb7feae24e41d8120b59430f8b77cc2106a6
Parents: cc87280
Author: codlife <10...@qq.com>
Authored: Mon Sep 12 12:10:46 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Sep 12 12:10:46 2016 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Partitioner.scala   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4efcdb7f/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 98c3abe..93dfbc0 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -55,14 +55,16 @@ object Partitioner {
    * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
    */
   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
-    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
-    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
-      return r.partitioner.get
-    }
-    if (rdd.context.conf.contains("spark.default.parallelism")) {
-      new HashPartitioner(rdd.context.defaultParallelism)
+    val rdds = (Seq(rdd) ++ others)
+    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
+    if (hasPartitioner.nonEmpty) {
+      hasPartitioner.maxBy(_.partitions.length).partitioner.get
     } else {
-      new HashPartitioner(bySize.head.partitions.length)
+      if (rdd.context.conf.contains("spark.default.parallelism")) {
+        new HashPartitioner(rdd.context.defaultParallelism)
+      } else {
+        new HashPartitioner(rdds.map(_.partitions.length).max)
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org