You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/01 16:49:52 UTC

spark git commit: [SPARK-17271][SQL] Planner adds un-necessary Sort even if child orde…

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6281b74b6 -> 13bacd730


[SPARK-17271][SQL] Planner adds un-necessary Sort even if child orde\u2026

## What changes were proposed in this pull request?

Ports https://github.com/apache/spark/pull/14841 and https://github.com/apache/spark/pull/14910 from `master` to `branch-2.0`

Jira : https://issues.apache.org/jira/browse/SPARK-17271

Planner is adding un-needed SORT operation due to bug in the way comparison for `SortOrder` is done at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253
`SortOrder` needs to be compared semantically because `Expression` within two `SortOrder` can be "semantically equal" but not literally equal objects.

eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")`

Expression in required SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId,
        qualifier = Some("a")
      )
```

Expression in child SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId)
```

Notice that the output column has a qualifier but the child attribute does not but the inherent expression is the same and hence in this case we can say that the child satisfies the required sort order.

This PR includes following changes:
- Added a `semanticEquals` method to `SortOrder` so that it can compare underlying child expressions semantically (and not using default Object.equals)
- Fixed `EnsureRequirements` to use semantic comparison of SortOrder

## How was this patch tested?

- Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite`

Author: Tejas Patil <te...@fb.com>

Closes #14920 from tejasapatil/SPARK-17271_2.0_port.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13bacd73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13bacd73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13bacd73

Branch: refs/heads/branch-2.0
Commit: 13bacd7308c42c92f42fbc3ffbee9a13282668a9
Parents: 6281b74
Author: Tejas Patil <te...@fb.com>
Authored: Thu Sep 1 18:49:43 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Sep 1 18:49:43 2016 +0200

----------------------------------------------------------------------
 .../execution/exchange/EnsureRequirements.scala | 11 +++++-
 .../spark/sql/execution/PlannerSuite.scala      | 40 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13bacd73/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 446571a..f170499 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -236,7 +236,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
     children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
-        if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
+        val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
+          false
+        } else {
+          requiredOrdering.zip(child.outputOrdering).forall {
+            case (requiredOrder, childOutputOrder) =>
+              requiredOrder.semanticEquals(childOutputOrder)
+          }
+        }
+
+        if (!orderingMatched) {
           SortExec(requiredOrdering, global = false, child = child)
         } else {
           child

http://git-wip-us.apache.org/repos/asf/spark/blob/13bacd73/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 13490c3..375da22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -415,6 +415,44 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("EnsureRequirements skips sort when required ordering is semantically equal to " +
+    "existing ordering") {
+    val exprId: ExprId = NamedExpression.newExprId
+    val attribute1 =
+      AttributeReference(
+        name = "col1",
+        dataType = LongType,
+        nullable = false
+      ) (exprId = exprId,
+        qualifier = Some("col1_qualifier")
+      )
+
+    val attribute2 =
+      AttributeReference(
+        name = "col1",
+        dataType = LongType,
+        nullable = false
+      ) (exprId = exprId)
+
+    val orderingA1 = SortOrder(attribute1, Ascending)
+    val orderingA2 = SortOrder(attribute2, Ascending)
+
+    assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to $orderingA2")
+    assert(orderingA1.semanticEquals(orderingA2),
+      s"$orderingA1 should be semantically equal to $orderingA2")
+
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA1)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA2)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
+      fail(s"No sorts should have been added:\n$outputPlan")
+    }
+  }
+
   // This is a regression test for SPARK-11135
   test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
     val orderingA = SortOrder(Literal(1), Ascending)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org