You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/06 09:33:06 UTC

git commit: [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.

Repository: spark
Updated Branches:
  refs/heads/master 9422c4ee0 -> 1b9001f78


[SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.

This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object.

Author: Reynold Xin <rx...@apache.org>

Closes #2282 from rxin/SPARK-3409 and squashes the following commits:

1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b9001f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b9001f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b9001f7

Branch: refs/heads/master
Commit: 1b9001f78d96faefff02b846b169c249d9e4d612
Parents: 9422c4e
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Sep 6 00:33:00 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Sep 6 00:33:00 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   | 43 ++++++++++----------
 1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b9001f7/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 4802e40..927f400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
 
   override def outputPartitioning = newPartitioning
 
-  def output = child.output
+  override def output = child.output
 
   /** We must copy rows when sort based shuffle is on */
   protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
 
-  def execute() = attachTree(this , "execute") {
+  override def execute() = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
         // TODO: Eliminate redundant expressions in grouping key and value.
-        val rdd = child.execute().mapPartitions { iter =>
-          if (sortBasedShuffleOn) {
-            @transient val hashExpressions =
-              newProjection(expressions, child.output)
-
+        val rdd = if (sortBasedShuffleOn) {
+          child.execute().mapPartitions { iter =>
+            val hashExpressions = newProjection(expressions, child.output)
             iter.map(r => (hashExpressions(r), r.copy()))
-          } else {
-            @transient val hashExpressions =
-              newMutableProjection(expressions, child.output)()
-
+          }
+        } else {
+          child.execute().mapPartitions { iter =>
+            val hashExpressions = newMutableProjection(expressions, child.output)()
             val mutablePair = new MutablePair[Row, Row]()
             iter.map(r => mutablePair.update(hashExpressions(r), r))
           }
@@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
         shuffled.map(_._2)
 
       case RangePartitioning(sortingExpressions, numPartitions) =>
-        // TODO: RangePartitioner should take an Ordering.
-        implicit val ordering = new RowOrdering(sortingExpressions, child.output)
-
-        val rdd = child.execute().mapPartitions { iter =>
-          if (sortBasedShuffleOn) {
-            iter.map(row => (row.copy(), null))
-          } else {
+        val rdd = if (sortBasedShuffleOn) {
+          child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
+        } else {
+          child.execute().mapPartitions { iter =>
             val mutablePair = new MutablePair[Row, Null](null, null)
             iter.map(row => mutablePair.update(row, null))
           }
         }
+
+        // TODO: RangePartitioner should take an Ordering.
+        implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+
         val part = new RangePartitioner(numPartitions, rdd, ascending = true)
         val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
         shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
@@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
         shuffled.map(_._1)
 
       case SinglePartition =>
-        val rdd = child.execute().mapPartitions { iter =>
-          if (sortBasedShuffleOn) {
-            iter.map(r => (null, r.copy()))
-          } else {
+        val rdd = if (sortBasedShuffleOn) {
+          child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
+        } else {
+          child.execute().mapPartitions { iter =>
             val mutablePair = new MutablePair[Null, Row]()
             iter.map(r => mutablePair.update(null, r))
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org