You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/07/15 03:55:38 UTC

spark git commit: [SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317)

Repository: spark
Updated Branches:
  refs/heads/master e965a798d -> cc57d705e


[SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317)

SPARK-8317 changed the SQL Exchange operator so that it no longer pushed sorting into Spark's shuffle layer, a change which allowed more efficient SQL-specific sorters to be used.

This patch performs some leftover cleanup based on those changes:

- Exchange's constructor should no longer accept a `newOrdering` since it's no longer used and no longer works as expected.
- `addOperatorsIfNecessary` looked at shuffle input's output ordering to decide whether to sort, but this is the wrong node to be examining: it needs to look at whether the post-shuffle node has the right ordering, since shuffling will not preserve row orderings.  Thanks to davies for spotting this.

Author: Josh Rosen <jo...@databricks.com>

Closes #7407 from JoshRosen/SPARK-9050 and squashes the following commits:

e70be50 [Josh Rosen] No need to wrap line
e866494 [Josh Rosen] Refactor addOperatorsIfNecessary to make code clearer
2e467da [Josh Rosen] Remove `newOrdering` from Exchange.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc57d705
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc57d705
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc57d705

Branch: refs/heads/master
Commit: cc57d705e732aefc2f3d3f438e84d71705b2eb65
Parents: e965a79
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Jul 14 18:55:34 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jul 14 18:55:34 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   | 37 ++++++++------------
 .../spark/sql/execution/SparkStrategies.scala   |  3 +-
 2 files changed, 16 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc57d705/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 4b783e3..feea4f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -35,21 +35,13 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn
 
 /**
  * :: DeveloperApi ::
- * Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each
- * resulting partition based on expressions from the partition key.  It is invalid to construct an
- * exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.
+ * Performs a shuffle that will result in the desired `newPartitioning`.
  */
 @DeveloperApi
-case class Exchange(
-    newPartitioning: Partitioning,
-    newOrdering: Seq[SortOrder],
-    child: SparkPlan)
-  extends UnaryNode {
+case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
 
   override def outputPartitioning: Partitioning = newPartitioning
 
-  override def outputOrdering: Seq[SortOrder] = newOrdering
-
   override def output: Seq[Attribute] = child.output
 
   /**
@@ -279,23 +271,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
           partitioning: Partitioning,
           rowOrdering: Seq[SortOrder],
           child: SparkPlan): SparkPlan = {
-        val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
-        val needsShuffle = child.outputPartitioning != partitioning
 
-        val withShuffle = if (needsShuffle) {
-          Exchange(partitioning, Nil, child)
-        } else {
-          child
+        def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
+          if (child.outputPartitioning != partitioning) {
+            Exchange(partitioning, child)
+          } else {
+            child
+          }
         }
 
-        val withSort = if (needSort) {
-          sqlContext.planner.BasicOperators.getSortOperator(
-            rowOrdering, global = false, withShuffle)
-        } else {
-          withShuffle
+        def addSortIfNecessary(child: SparkPlan): SparkPlan = {
+          if (rowOrdering.nonEmpty && child.outputOrdering != rowOrdering) {
+            sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
+          } else {
+            child
+          }
         }
 
-        withSort
+        addSortIfNecessary(addShuffleIfNecessary(child))
       }
 
       if (meetsRequirements && compatible && !needsAnySort) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cc57d705/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce25af5..73b4634 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -360,8 +360,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.OneRowRelation =>
         execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
       case logical.RepartitionByExpression(expressions, child) =>
-        execution.Exchange(
-          HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
+        execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
       case e @ EvaluatePython(udf, child, _) =>
         BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
       case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org