You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/22 06:54:19 UTC

spark git commit: [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

Repository: spark
Updated Branches:
  refs/heads/master 5ac96854c -> 5960686e7


[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.

Author: maryannxue <ma...@gmail.com>

Closes #19281 from maryannxue/spark-21998.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5960686e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5960686e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5960686e

Branch: refs/heads/master
Commit: 5960686e791b5d6642a30c43c1de61e96e594a5e
Parents: 5ac9685
Author: maryannxue <ma...@gmail.com>
Authored: Thu Sep 21 23:54:16 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Sep 21 23:54:16 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/SortOrder.scala    | 23 ++++++++
 .../execution/exchange/EnsureRequirements.scala | 21 ++-----
 .../sql/execution/joins/SortMergeJoinExec.scala | 17 ++++--
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 62 ++++++++++++++++++++
 4 files changed, 102 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index abcb9a2..ff7c98f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -96,6 +96,29 @@ object SortOrder {
      sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
     new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
+   * or of A's prefix. Here are examples of ordering A satisfying ordering B:
+   * <ul>
+   *   <li>ordering A is [x, y] and ordering B is [x]</li>
+   *   <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1]</li>
+   *   <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1]</li>
+   * </ul>
+   */
+  def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = {
+    if (ordering2.isEmpty) {
+      true
+    } else if (ordering2.length > ordering1.length) {
+      false
+    } else {
+      ordering2.zip(ordering1).forall {
+        case (o2, o1) => o1.satisfies(o2)
+      }
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index b91d077..1da72f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
 
     // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
     children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
-      if (requiredOrdering.nonEmpty) {
-        // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
-        val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
-          false
-        } else {
-          requiredOrdering.zip(child.outputOrdering).forall {
-            case (requiredOrder, childOutputOrder) =>
-              childOutputOrder.satisfies(requiredOrder)
-          }
-        }
-
-        if (!orderingMatched) {
-          SortExec(requiredOrdering, global = false, child = child)
-        } else {
-          child
-        }
-      } else {
+      // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
+      if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
         child
+      } else {
+        SortExec(requiredOrdering, global = false, child = child)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 91d214e..14de2dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -102,13 +102,22 @@ case class SortMergeJoinExec(
   }
 
   /**
-   * For SMJ, child's output must have been sorted on key or expressions with the same order as
-   * key, so we can get ordering for key from child's output ordering.
+   * The utility method to get output ordering for left or right side of the join.
+   *
+   * Returns the required ordering for left or right child if childOutputOrdering does not
+   * satisfy the required ordering; otherwise, which means the child does not need to be sorted
+   * again, returns the required ordering for this child with extra "sameOrderExpressions" from
+   * the child's outputOrdering.
    */
   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder])
     : Seq[SortOrder] = {
-    keys.zip(childOutputOrdering).map { case (key, childOrder) =>
-      SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
+    val requiredOrdering = requiredOrders(keys)
+    if (SortOrder.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
+      keys.zip(childOutputOrdering).map { case (key, childOrder) =>
+        SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
+      }
+    } else {
+      requiredOrdering
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 453052a..9d50e8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -24,6 +24,8 @@ import scala.language.existentials
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
+import org.apache.spark.sql.execution.SortExec
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("test SortMergeJoin output ordering") {
+    val joinQueries = Seq(
+      "SELECT * FROM testData JOIN testData2 ON key = a",
+      "SELECT * FROM testData t1 JOIN " +
+        "testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
+      "SELECT * FROM testData t1 JOIN " +
+        "testData2 t2 ON t1.key = t2.a JOIN " +
+        "testData3 t3 ON t2.a = t3.a JOIN " +
+        "testData t4 ON t1.key = t4.key")
+
+    def assertJoinOrdering(sqlString: String): Unit = {
+      val df = sql(sqlString)
+      val physical = df.queryExecution.sparkPlan
+      val physicalJoins = physical.collect {
+        case j: SortMergeJoinExec => j
+      }
+      val executed = df.queryExecution.executedPlan
+      val executedJoins = executed.collect {
+        case j: SortMergeJoinExec => j
+      }
+      // This only applies to the above tested queries, in which a child SortMergeJoin always
+      // contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
+      // appear as parent of SortMergeJoin.
+      executed.foreach {
+        case s: SortExec => s.foreach {
+          case j: SortMergeJoinExec => fail(
+            s"No extra sort should be added since $j already satisfies the required ordering"
+          )
+          case _ =>
+        }
+        case _ =>
+      }
+      val joinPairs = physicalJoins.zip(executedJoins)
+      val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
+      assert(joinPairs.size == numOfJoins)
+
+      joinPairs.foreach {
+        case(join1, join2) =>
+          val leftKeys = join1.leftKeys
+          val rightKeys = join1.rightKeys
+          val outputOrderingPhysical = join1.outputOrdering
+          val outputOrderingExecuted = join2.outputOrdering
+
+          // outputOrdering should always contain join keys
+          assert(
+            SortOrder.orderingSatisfies(
+              outputOrderingPhysical, leftKeys.map(SortOrder(_, Ascending))))
+          assert(
+            SortOrder.orderingSatisfies(
+              outputOrderingPhysical, rightKeys.map(SortOrder(_, Ascending))))
+          // outputOrdering should be consistent between physical plan and executed plan
+          assert(outputOrderingPhysical == outputOrderingExecuted,
+            s"Operator $join1 did not have the same output ordering in the physical plan as in " +
+            s"the executed plan.")
+      }
+    }
+
+    joinQueries.foreach(assertJoinOrdering)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org